pydgraph
This is the official Dgraph database client implementation for Python (Python >= v3.9), using
gRPC.
Before using this client, we highly recommend that you read the the product documentation at
https://docs.dgraph.io/.
Table of contents
Install
Install using pip:
pip install pydgraph
Protobuf Version Compatibility
pydgraph supports protobuf versions 4.23.0 through 6.x. The specific version installed depends on
your environment:
- Modern environments: protobuf 6.x is recommended and will be installed by default on Python
3.13+
- Legacy environments: If you need to use protobuf 4.x or 5.x (e.g., for compatibility with
other packages), you can pin the version:
pip install pydgraph "protobuf>=4.23.0,<5.0.0"
pip install pydgraph "protobuf>=5.0.0,<6.0.0"
All supported protobuf versions are tested in CI against both Dgraph latest and Dgraph HEAD.
Supported Versions
Depending on the version of Dgraph that you are connecting to, you should use a different version of
this client. Using an incompatible version may lead to unexpected behavior or errors.
| 21.03.x | 21.03.x |
| 23.0.x+ | 23.0.x |
| 24.0.x+ | 24.0.x |
| 25.0.x+ | 25.0.x |
Quickstart
Build and run the simple project in the examples folder, which contains an end-to-end
example of using the Dgraph python client. For additional details, follow the instructions in the
project's README.
Using a client
Creating a Client
You can initialize a DgraphClient object by passing it a list of DgraphClientStub clients as
variadic arguments. Connecting to multiple Dgraph servers in the same cluster allows for better
distribution of workload.
The following code snippet shows just one connection.
import pydgraph
client_stub = pydgraph.DgraphClientStub('localhost:9080')
client = pydgraph.DgraphClient(client_stub)
Using Dgraph Connection Strings
The pydgraph package supports connecting to a Dgraph cluster using connection strings. Dgraph
connections strings take the form dgraph://{username:password@}host:port?args.
username and password are optional. If username is provided, a password must also be present. If
supplied, these credentials are used to log into a Dgraph cluster through the ACL mechanism.
Valid connection string args:
| bearertoken | <token> | an access token |
| sslmode | disable | require | verify-ca | TLS option, the default is disable. If verify-ca is set, the TLS certificate configured in the Dgraph cluster must be from a valid certificate authority. |
| namespace | <namespace> | a previously created integer-based namespace, username and password must be supplied |
Note the sslmode=require pair is not supported and will throw an Exception if used. Python grpc
does not support traffic over TLS that does not fully verify the certificate and domain. Developers
should use the existing stub/client initialization steps for self-signed certs as demonstrated in
examples/tls/tls_example.py.
Some example connection strings:
| dgraph://localhost:9080 | Connect to localhost, no ACL, no TLS |
| dgraph://sally:supersecret@dg.example.com:443?sslmode=verify-ca | Connect to remote server, use ACL and require TLS and a valid certificate from a CA |
| dgraph://foo-bar.grpc.dgraph-io.com:443?sslmode=verify-ca&bearertoken=<some access token> | Connect to a Dgraph cluster protected by a secure gateway |
| dgraph://sally:supersecret@dg.example.com:443?namespace=2 | Connect to a ACL enabled Dgraph cluster in namespace 2 |
Using the Open function with a connection string:
client = pydgraph.open("dgraph://groot:password@localhost:8090")
...
client.close()
Login into a Namespace
If your server has Access Control Lists enabled (Dgraph v1.1 or above), the client must be logged in
for accessing data. If you didn't use the open function with credentials and a namespace, use the
login endpoint.
Calling login will obtain and remember the access and refresh JWT tokens. All subsequent
operations via the logged in client will send along the stored access token.
client.login("groot", "password")
If your server additionally has namespaces (Dgraph v21.03 or above), use the login_into_namespace
API.
client.login_into_namespace("groot", "password", "123")
Altering the Database
Set the Dgraph types schema
To set the Dgraph types schema (aka DQL schema), create an Operation object, set the schema and
pass it to DgraphClient#alter(Operation) method.
schema = 'name: string @index(exact) .'
op = pydgraph.Operation(schema=schema)
client.alter(op)
Indexes can be computed in the background. You can set the run_in_background field of
pydgraph.Operation to True before passing it to the Alter function. You can find more details
in the
Dgraph documentation on background indexes.
Note To deploy the GraphQL schema in python you have to use GraphQL client such as
python-graphql-client to invoke the
GraphQL admin mutation updateGQLSchema
schema = 'name: string @index(exact) .'
op = pydgraph.Operation(schema=schema, run_in_background=True)
client.alter(op)
Drop data
To drop all data and schema:
op = pydgraph.Operation(drop_all=True)
client.alter(op)
Note If the Dgraph cluster contains a GraphQL Schema, it will also be deleted by this operation.
To drop all data and preserve the DQL schema:
op = pydgraph.Operation(drop_op="DATA")
client.alter(op)
To drop a predicate:
op = pydgraph.Operation(drop_op="ATTR", drop_value="<predicate_name>")
client.alter(op)
the same result is obtained using
op = pydgraph.Operation(drop_attr="<predicate_name>")
client.alter(op)
To drop a type definition from DQL Schema:
op = pydgraph.Operation(drop_op="TYPE", drop_value="<predicate_name>")
client.alter(op)
Note drop_op="TYPE" just removes a type definition from the DQL schema. No data is removed
from the cluster. The operation does not drop the predicates associated with the type.
Creating a Transaction
To create a transaction, call the DgraphClient#txn() method, which returns a new Txn object.
This operation incurs no network overhead.
It is good practice to call Txn#discard() in a finally block after running the transaction.
Calling Txn#discard() after Txn#commit() is a no-op and you can call Txn#discard() multiple
times with no additional side-effects.
txn = client.txn()
try:
finally:
txn.discard()
To create a read-only transaction, call DgraphClient#txn(read_only=True). Read-only transactions
are ideal for transactions which only involve queries. Mutations and commits are not allowed.
txn = client.txn(read_only=True)
try:
finally:
txn.discard()
To create a read-only transaction that executes best-effort queries, call
DgraphClient#txn(read_only=True, best_effort=True). Best-effort queries are faster than normal
queries because they bypass the normal consensus protocol. For this same reason, best-effort queries
cannot guarantee to return the latest data. Best-effort queries are only supported by read-only
transactions.
Running a Mutation
Txn#mutate(mu=Mutation) runs a mutation. It takes in a Mutation object, which provides two main
ways to set data: JSON and RDF N-Quad. You can choose whichever way is convenient.
Txn#mutate() provides convenience keyword arguments set_obj and del_obj for setting JSON
values and set_nquads and del_nquads for setting N-Quad values. See examples below for usage.
We define a person object to represent a person and use it in a transaction.
p = { 'name': 'Alice' }
txn.mutate(set_obj=p)
query = """query all($a: string)
{
all(func: eq(name, $a))
{
uid
}
}"""
variables = {'$a': 'Bob'}
res = txn.query(query, variables=variables)
ppl = json.loads(res.json)
txn.mutate(del_obj=person)
For a complete example with multiple fields and relationships, look at the simple project
in the examples folder.
Sometimes, you only want to commit a mutation, without querying anything further. In such cases, you
can set the keyword argument commit_now=True to indicate that the mutation must be immediately
committed.
A mutation can be executed using txn.do_request as well.
mutation = txn.create_mutation(set_nquads='_:alice <name> "Alice" .')
request = txn.create_request(mutations=[mutation], commit_now=True)
txn.do_request(request)
Committing a Transaction
A transaction can be committed using the Txn#commit() method. If your transaction consist solely
of Txn#query or Txn#queryWithVars calls, and no calls to Txn#mutate, then calling
Txn#commit() is not necessary.
An error is raised if another transaction(s) modify the same data concurrently that was modified in
the current transaction. It is up to the user to retry transactions when they fail.
txn = client.txn()
try:
txn.commit()
except pydgraph.AbortedError:
finally:
txn.discard()
Using Transaction with Context Manager
The Python context manager will automatically perform the "commit" action after all queries and
mutations have been done, and perform "discard" action to clean the transaction. When something
goes wrong in the scope of context manager, "commit" will not be called,and the "discard" action
will be called to drop any potential changes.
with client.begin(read_only=False, best_effort=False) as txn:
or you can directly create a transaction from the Txn class.
with pydgraph.Txn(client, read_only=False, best_effort=False) as txn:
client.begin() can only be used with "with-as" blocks, while pydgraph.Txn class can be
directly called to instantiate a transaction object.
Running a Query
You can run a query by calling Txn#query(string). You will need to pass in a
DQL query string. If you want to pass an additional dictionary of any
variables that you might want to set in the query, call Txn#query(string, variables=d) with the
variables dictionary d.
The query response contains the json field, which returns the JSON response. Let’s run a query
with a variable $a, deserialize the result from JSON and print it out:
query = """query all($a: string) {
all(func: eq(name, $a))
{
name
}
}"""
variables = {'$a': 'Alice'}
res = txn.query(query, variables=variables)
ppl = json.loads(res.json)
print('Number of people named "Alice": {}'.format(len(ppl['all'])))
for person in ppl['all']:
print(person)
This should print:
Number of people named "Alice": 1
Alice
You can also use txn.do_request function to run the query.
request = txn.create_request(query=query)
txn.do_request(request)
Query with RDF response
You can get query result as a RDF response by calling Txn#query(string) with resp_format set to
RDF. The response would contain a rdf field, which has the RDF encoded result.
Note: If you are querying only for uid values, use a JSON format response.
res = txn.query(query, variables=variables, resp_format="RDF")
print(res.rdf)
Running an Upsert: Query + Mutation
The txn.do_request function allows you to use upsert blocks. An upsert block contains one query
block and one or more mutation blocks, so it lets you perform queries and mutations in a single
request. Variables defined in the query block can be used in the mutation blocks using the uid and
val functions implemented by DQL.
To learn more about upsert blocks, see the
Upsert Block documentation.
query = """{
u as var(func: eq(name, "Alice"))
}"""
nquad = """
uid(u) <name> "Alice" .
uid(u) <age> "25" .
"""
mutation = txn.create_mutation(set_nquads=nquad)
request = txn.create_request(query=query, mutations=[mutation], commit_now=True)
txn.do_request(request)
Running a Conditional Upsert
The upsert block also allows specifying a conditional mutation block using an @if directive. The
mutation is executed only when the specified condition is true. If the condition is false, the
mutation is silently ignored.
See more about
conditional upserts in the Dgraph documentation.
query = """
{
user as var(func: eq(email, "wrong_email@dgraph.io"))
}
"""
cond = "@if(eq(len(user), 1))"
nquads = """
uid(user) <email> "correct_email@dgraph.io" .
"""
mutation = txn.create_mutation(cond=cond, set_nquads=nquads)
request = txn.create_request(mutations=[mutation], query=query, commit_now=True)
txn.do_request(request)
Cleaning Up Resources
To clean up resources, you have to call DgraphClientStub#close() individually for all the
instances of DgraphClientStub.
SERVER_ADDR1 = "localhost:9080"
SERVER_ADDR2 = "localhost:9080"
stub1 = pydgraph.DgraphClientStub(SERVER_ADDR1)
stub2 = pydgraph.DgraphClientStub(SERVER_ADDR2)
client = pydgraph.DgraphClient(stub1, stub2)
...
stub1.close()
stub2.close()
Use context manager to automatically clean resources
Use function call:
with pydgraph.client_stub(SERVER_ADDR) as stub1:
with pydgraph.client_stub(SERVER_ADDR) as stub2:
client = pydgraph.DgraphClient(stub1, stub2)
Use class constructor:
with pydgraph.DgraphClientStub(SERVER_ADDR) as stub1:
with pydgraph.DgraphClientStub(SERVER_ADDR) as stub2:
client = pydgraph.DgraphClient(stub1, stub2)
Note: client should be used inside the "with-as" block. The resources related to client will
be automatically released outside the block and client is not usable any more.
Metadata headers such as authentication tokens can be set through the metadata of gRPC methods.
Below is an example of how to set a header named "auth-token".
metadata = [("auth-token", "the-auth-token-value")]
dg.alter(op, metadata=metadata)
Setting a timeout
A timeout value representing the number of seconds can be passed to the login, alter, query,
and mutate methods using the timeout keyword argument.
For example, the following alters the schema with a timeout of ten seconds:
dg.alter(op, timeout=10)
Async methods
The alter method in the client has an asynchronous version called async_alter. The async methods
return a future. You can directly call the result method on the future. However. The DgraphClient
class provides a static method handle_alter_future to handle any possible exception.
alter_future = self.client.async_alter(pydgraph.Operation(schema="name: string @index(term) ."))
response = pydgraph.DgraphClient.handle_alter_future(alter_future)
The query and mutate methods int the Txn class also have async versions called async_query
and async_mutation respectively. These functions work just like async_alter.
You can use the handle_query_future and handle_mutate_future static methods in the Txn class
to retrieve the result. A short example is given below:
txn = client.txn()
query = "query body here"
future = txn.async_query()
response = pydgraph.Txn.handle_query_future(future)
Keep in mind that due to the nature of async calls, the async functions cannot retry the request if
the login is invalid. You will have to check for this error and retry the login (with the function
retry_login in both the Txn and Client classes). A short example is given below:
client = DgraphClient(client_stubs)
alter_future = client.async_alter()
try:
response = alter_future.result()
except Exception as e:
if pydgraph.util.is_jwt_expired(e):
Native Async/Await Client
pydgraph provides a native async/await client using Python's asyncio library and grpc.aio. This
provides true asynchronous operations with better concurrency compared to the futures-based approach
above.
Basic Usage
import asyncio
import pydgraph
async def main():
client_stub = pydgraph.AsyncDgraphClientStub('localhost:9080')
client = pydgraph.AsyncDgraphClient(client_stub)
try:
await client.login("groot", "password")
await client.alter(pydgraph.Operation(
schema="name: string @index(term) ."
))
txn = client.txn()
response = await txn.mutate(
set_obj={"name": "Alice"},
commit_now=True
)
query = '{ me(func: has(name)) { name } }'
txn = client.txn(read_only=True)
response = await txn.query(query)
print(response.json)
finally:
await client.close()
asyncio.run(main())
Using Connection Strings
The async client supports the same connection string format as the sync client:
import asyncio
import pydgraph
async def main():
async with await pydgraph.async_open(
"dgraph://groot:password@localhost:9080"
) as client:
version = await client.check_version()
print(f"Connected to Dgraph version: {version}")
asyncio.run(main())
Using Context Managers
Both the async client and transactions support async context managers for automatic resource
cleanup:
import asyncio
import pydgraph
async def main():
async with await pydgraph.async_open("dgraph://localhost:9080") as client:
await client.login("groot", "password")
async with client.txn() as txn:
response = await txn.query('{ me(func: has(name)) { name } }')
print(response.json)
asyncio.run(main())
Concurrent Operations
The async client excels at running many operations concurrently:
import asyncio
import pydgraph
async def run_query(client, name):
"""Run a single query"""
query = f'{{ me(func: eq(name, "{name}")) {{ name }} }}'
txn = client.txn(read_only=True)
return await txn.query(query)
async def main():
async with await pydgraph.async_open("dgraph://localhost:9080") as client:
await client.login("groot", "password")
names = [f"User{i}" for i in range(100)]
tasks = [run_query(client, name) for name in names]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} queries concurrently")
asyncio.run(main())
JWT Refresh
The async client automatically handles JWT token refresh, just like the sync client:
async with await pydgraph.async_open("dgraph://groot:password@localhost:9080") as client:
response = await client.alter(pydgraph.Operation(schema="name: string ."))
Error Handling
Error handling works the same as the sync client:
import pydgraph
async def main():
async with await pydgraph.async_open("dgraph://localhost:9080") as client:
try:
await client.login("groot", "wrong_password")
except Exception as e:
print(f"Login failed: {e}")
try:
txn = client.txn(read_only=True)
await txn.mutate(set_obj={"name": "Alice"})
except pydgraph.errors.TransactionError as e:
print(f"Cannot mutate in read-only transaction: {e}")
asyncio.run(main())
Differences from Sync Client
| Import | pydgraph.DgraphClient | pydgraph.AsyncDgraphClient |
| Connection function | pydgraph.open() | await pydgraph.async_open() |
| Method calls | client.query() | await client.query() |
| Context manager | with client.txn() as txn: | async with client.txn() as txn: |
| Concurrency | Threading | Native asyncio |
| JWT refresh | Automatic | Automatic |
Handling Transaction Conflicts
Dgraph uses optimistic concurrency control (MVCC). When multiple transactions modify the same
data simultaneously, conflicts can occur and Dgraph will abort one of the transactions with an
AbortedError. When this happens, the entire transaction must be retried from scratch.
pydgraph provides built-in retry utilities with exponential backoff to handle these conflicts
automatically.
Using run_transaction (Recommended)
The simplest approach - pass your operation as a callable:
import pydgraph
def create_user(txn):
"""Transaction operation that will be retried on conflict."""
response = txn.mutate(set_obj={"name": "Alice", "age": 30})
txn.commit()
return response.uids
client = pydgraph.DgraphClient(pydgraph.DgraphClientStub("localhost:9080"))
result = pydgraph.run_transaction(client, create_user, max_retries=5)
print(f"Created user: {result}")
For async code:
async def create_user_async(txn):
response = await txn.mutate(set_obj={"name": "Alice", "age": 30})
await txn.commit()
return response.uids
result = await pydgraph.run_transaction_async(client, create_user_async)
Using the Retry Decorator
Wrap any function that performs Dgraph operations:
import pydgraph
@pydgraph.with_retry(max_retries=5, base_delay=0.1)
def upsert_counter(client, counter_id):
"""Increment a counter atomically - automatically retried on conflict."""
txn = client.txn()
try:
query = f'{{ counter(func: uid({counter_id})) {{ value }} }}'
result = txn.query(query)
current = json.loads(result.json).get("counter", [{}])[0].get("value", 0)
txn.mutate(set_obj={"uid": counter_id, "value": current + 1})
txn.commit()
finally:
txn.discard()
upsert_counter(client, "0x123")
For async functions:
@pydgraph.with_retry_async(max_retries=5)
async def upsert_counter_async(client, counter_id):
async with client.txn() as txn:
pass
Using the Retry Generator
For fine-grained control within a function:
import pydgraph
def transfer_funds(client, from_account, to_account, amount):
"""Transfer funds between accounts with manual retry control."""
for attempt in pydgraph.retry(max_retries=5, base_delay=0.1):
with attempt:
txn = client.txn()
try:
txn.commit()
finally:
txn.discard()
For async code:
async def transfer_funds_async(client, from_account, to_account, amount):
async for attempt in pydgraph.retry_async(max_retries=5):
with attempt:
async with client.txn() as txn:
pass
Retry Parameters
All retry utilities accept these parameters:
max_retries | 5 | Maximum number of retry attempts |
base_delay | 0.1 | Initial delay in seconds between retries |
max_delay | 5.0 | Maximum delay cap in seconds |
jitter | 0.1 | Random jitter factor (0-1) to prevent thundering herd |
Which Errors Are Retried?
Only these errors trigger automatic retries:
pydgraph.AbortedError - Transaction conflict (optimistic concurrency)
pydgraph.RetriableError - Transient server errors
All other exceptions propagate immediately.
Example: High-Contention Counter
Here's a complete example handling a high-contention scenario:
import json
import pydgraph
def increment_counter(client, counter_uid):
"""Atomically increment a counter, handling conflicts automatically."""
def operation(txn):
query = f'{{ counter(func: uid({counter_uid})) {{ count }} }}'
result = txn.query(query)
data = json.loads(result.json)
current = data.get("counter", [{}])[0].get("count", 0)
txn.mutate(set_obj={"uid": counter_uid, "count": current + 1})
txn.commit()
return current + 1
return pydgraph.run_transaction(
client, operation,
max_retries=10,
base_delay=0.05,
max_delay=2.0,
jitter=0.25
)
client = pydgraph.DgraphClient(pydgraph.DgraphClientStub("localhost:9080"))
new_value = increment_counter(client, "0x1")
print(f"Counter is now: {new_value}")
Examples
- simple: Quickstart example of using pydgraph.
- tls: Quickstart example that uses TLS.
- parse_datetime: Demonstration of converting Dgraph's DateTime strings to native python datetime.
Contributing
We welcome contributions! Please see CONTRIBUTING.md for detailed information on:
/
- Setting up your development environment
- Code style and standards
- Testing procedures
- Submitting pull requests