InfluxDB Client Go
This repository contains the Go client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight v3 client library. InfluxDB 1.x users should use the v1 client library.
For ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the v1 client library.
Features
- InfluxDB 2 client
- Querying data
- using the Flux language
- into raw data, flux table representation
- How to queries
- Writing data using
- InfluxDB 2 API
- setup, ready, health
- authotizations, users, organizations
- buckets, delete
- ...
Documentation
This section contains links to the client library documentation.
Examples
Examples for basic writing and querying data are shown below in this document
There are also other examples in the API docs:
How To Use
Installation
Go 1.17 or later is required.
Go mod project
- Add the latest version of the client package to your project dependencies (go.mod).
go get github.com/influxdata/influxdb-client-go/v2
- Add import
github.com/influxdata/influxdb-client-go/v2
to your source code.
GOPATH project
```sh
go get github.com/influxdata/influxdb-client-go
```
Note: To have go get in the GOPATH mode, the environment variable GO111MODULE
must have the off
value.
Basic Example
The following example demonstrates how to write data to InfluxDB 2 and read them back using the Flux language:
package main
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
client := influxdb2.NewClient("http://localhost:8086", "my-token")
writeAPI := client.WriteAPIBlocking("my-org", "my-bucket")
p := influxdb2.NewPoint("stat",
map[string]string{"unit": "temperature"},
map[string]interface{}{"avg": 24.5, "max": 45.0},
time.Now())
writeAPI.WritePoint(context.Background(), p)
p = influxdb2.NewPointWithMeasurement("stat").
AddTag("unit", "temperature").
AddField("avg", 23.2).
AddField("max", 45.0).
SetTime(time.Now())
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
panic(err)
}
line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
err = writeAPI.WriteRecord(context.Background(), line)
if err != nil {
panic(err)
}
queryAPI := client.QueryAPI("my-org")
result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
if err == nil {
for result.Next() {
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
fmt.Printf("row: %s\n", result.Record().String())
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
client.Close()
}
Options
The InfluxDBClient uses set of options to configure behavior. These are available in the Options object
Creating a client instance using
client := influxdb2.NewClient("http://localhost:8086", "my-token")
will use the default options.
To set different configuration values, e.g. to set gzip compression and trust all server certificates, get default options
and change what is needed:
client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token",
influxdb2.DefaultOptions().
SetUseGZip(true).
SetTLSConfig(&tls.Config{
InsecureSkipVerify: true,
}))
Writes
Client offers two ways of writing, non-blocking and blocking.
Non-blocking write client
Non-blocking write client uses implicit batching. Data are asynchronously
written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 5000, or the flush interval, default 1s, times out.
Writes are automatically retried on server back pressure.
This write client also offers synchronous blocking method to ensure that write buffer is flushed and all pending writes are finished,
see Flush() method.
Always use Close() method of the client to stop all background processes.
Asynchronous write client is recommended for frequent periodic writes.
package main
import (
"fmt"
"math/rand"
"time"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token",
influxdb2.DefaultOptions().SetBatchSize(20))
writeAPI := client.WriteAPI("my-org","my-bucket")
for i := 0; i <100; i++ {
p := influxdb2.NewPoint(
"system",
map[string]string{
"id": fmt.Sprintf("rack_%v", i%10),
"vendor": "AWS",
"hostname": fmt.Sprintf("host_%v", i%100),
},
map[string]interface{}{
"temperature": rand.Float64() * 80.0,
"disk_free": rand.Float64() * 1000.0,
"disk_total": (i/10 + 1) * 1000000,
"mem_total": (i/100 + 1) * 10000000,
"mem_free": rand.Uint64(),
},
time.Now())
writeAPI.WritePoint(p)
}
writeAPI.Flush()
client.Close()
}
Handling of failed async writes
WriteAPI by default continues with retrying of failed writes.
Retried are automatically writes that fail on a connection failure or when server returns response HTTP status code >= 429.
Retrying algorithm uses random exponential strategy to set retry time.
The delay for the next retry attempt is a random value in the interval retryInterval * exponentialBase^(attempts) and retryInterval * exponentialBase^(attempts+1).
If writes of batch repeatedly fails, WriteAPI continues with retrying until maxRetries is reached or the overall retry time of batch exceeds maxRetryTime.
The defaults parameters (part of the WriteOptions) are:
- retryInterval=5,000ms
- exponentialBase=2
- maxRetryDelay=125,000ms
- maxRetries=5
- maxRetryTime=180,000ms
Retry delays are by default randomly distributed within the ranges:
- 5,000-10,000
- 10,000-20,000
- 20,000-40,000
- 40,000-80,000
- 80,000-125,000
Setting retryInterval to 0 disables retry strategy and any failed write will discard the batch.
WriteFailedCallback allows advanced controlling of retrying.
It is synchronously notified in case async write fails.
It controls further batch handling by its return value. If it returns true
, WriteAPI continues with retrying of writes of this batch. Returned false
means the batch should be discarded.
Reading async errors
WriteAPI automatically logs write errors. Use Errors() method, which returns the channel for reading errors occuring during async writes, for writing write error to a custom target:
package main
import (
"fmt"
"math/rand"
"time"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
client := influxdb2.NewClient("http://localhost:8086", "my-token")
writeAPI := client.WriteAPI("my-org", "my-bucket")
errorsCh := writeAPI.Errors()
go func() {
for err := range errorsCh {
fmt.Printf("write error: %s\n", err.Error())
}
}()
for i := 0; i < 100; i++ {
p := influxdb2.NewPointWithMeasurement("stat").
AddTag("id", fmt.Sprintf("rack_%v", i%10)).
AddTag("vendor", "AWS").
AddTag("hostname", fmt.Sprintf("host_%v", i%100)).
AddField("temperature", rand.Float64()*80.0).
AddField("disk_free", rand.Float64()*1000.0).
AddField("disk_total", (i/10+1)*1000000).
AddField("mem_total", (i/100+1)*10000000).
AddField("mem_free", rand.Uint64()).
SetTime(time.Now())
writeAPI.WritePoint(p)
}
writeAPI.Flush()
client.Close()
}
Blocking write client
Blocking write client writes given point(s) synchronously. It doesn't do implicit batching. Batch is created from given set of points.
Implicit batching can be enabled with WriteAPIBlocking.EnableBatching()
.
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
client := influxdb2.NewClient("http://localhost:8086", "my-token")
writeAPI := client.WriteAPIBlocking("my-org","my-bucket")
for i := 0; i <100; i++ {
p := influxdb2.NewPoint(
"system",
map[string]string{
"id": fmt.Sprintf("rack_%v", i%10),
"vendor": "AWS",
"hostname": fmt.Sprintf("host_%v", i%100),
},
map[string]interface{}{
"temperature": rand.Float64() * 80.0,
"disk_free": rand.Float64() * 1000.0,
"disk_total": (i/10 + 1) * 1000000,
"mem_total": (i/100 + 1) * 10000000,
"mem_free": rand.Uint64(),
},
time.Now())
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
panic(err)
}
}
client.Close()
}
Queries
Query client offers retrieving of query results to a parsed representation in a QueryTableResult or to a raw string.
QueryTableResult
QueryTableResult offers comfortable way how to deal with flux query CSV response. It parses CSV stream into FluxTableMetaData, FluxColumn and FluxRecord objects
for easy reading the result.
package main
import (
"context"
"fmt"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
client := influxdb2.NewClient("http://localhost:8086", "my-token")
queryAPI := client.QueryAPI("my-org")
result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
if err == nil {
for result.Next() {
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
fmt.Printf("value: %v\n", result.Record().Value())
}
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
client.Close()
}
Raw
QueryRaw() returns raw, unparsed, query result string and process it on your own. Returned csv format
can be controlled by the third parameter, query dialect.
package main
import (
"context"
"fmt"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
client := influxdb2.NewClient("http://localhost:8086", "my-token")
queryAPI := client.QueryAPI("my-org")
result, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, influxdb2.DefaultDialect())
if err == nil {
fmt.Println("QueryResult:")
fmt.Println(result)
} else {
panic(err)
}
client.Close()
}
Parametrized Queries
InfluxDB Cloud supports Parameterized Queries
that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more
reusable and can also be used to help prevent injection attacks.
InfluxDB Cloud inserts the params object into the Flux query as a Flux record named params
. Use dot or bracket
notation to access parameters in the params
record in your Flux query. Parameterized Flux queries support only int
, float
, and string
data types. To convert the supported data types into
other Flux basic data types, use Flux type conversion functions.
Query parameters can be passed as a struct or map. Param values can be only simple types or time.Time
.
The name of the parameter represented by a struct field can be specified by JSON annotation.
Parameterized query example:
:warning: Parameterized Queries are supported only in InfluxDB Cloud. There is no support in InfluxDB OSS currently.
package main
import (
"context"
"fmt"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
client := influxdb2.NewClient("http://localhost:8086", "my-token")
queryAPI := client.QueryAPI("my-org")
parameters := struct {
Start string `json:"start"`
Field string `json:"field"`
Value float64 `json:"value"`
}{
"-1h",
"temperature",
25,
}
query := `from(bucket:"my-bucket")
|> range(start: duration(params.start))
|> filter(fn: (r) => r._measurement == "stat")
|> filter(fn: (r) => r._field == params.field)
|> filter(fn: (r) => r._value > params.value)`
result, err := queryAPI.QueryWithParams(context.Background(), query, parameters)
if err == nil {
for result.Next() {
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
fmt.Printf("value: %v\n", result.Record().Value())
}
if result.Err() != nil {
fmt.Printf("query parsing error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
client.Close()
}
Concurrency
InfluxDB Go Client can be used in a concurrent environment. All its functions are thread-safe.
The best practise is to use a single Client
instance per server URL. This ensures optimized resources usage,
most importantly reusing HTTP connections.
For efficient reuse of HTTP resources among multiple clients, create an HTTP client and use Options.SetHTTPClient()
for setting it to all clients:
httpClient := &http.Client{
Timeout: time.Second * time.Duration(60),
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
}
client1 := influxdb2.NewClientWithOptions("https://server:8086", "my-token", influxdb2.DefaultOptions().SetHTTPClient(httpClient))
client2 := influxdb2.NewClientWithOptions("https://server:9999", "my-token2", influxdb2.DefaultOptions().SetHTTPClient(httpClient))
Client ensures that there is a single instance of each server API sub-client for the specific area. E.g. a single WriteAPI
instance for each org/bucket pair,
a single QueryAPI
for each org.
Such a single API sub-client instance can be used concurrently:
package main
import (
"math/rand"
"sync"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
func main() {
client := influxdb2.NewClient("http://localhost:8086", "my-token")
defer client.Close()
writeApi := client.WriteAPI("my-org", "my-bucket")
pointsCh := make(chan *write.Point, 200)
threads := 5
var wg sync.WaitGroup
go func(points int) {
for i := 0; i < points; i++ {
p := influxdb2.NewPoint("meas",
map[string]string{"tag": "tagvalue"},
map[string]interface{}{"val1": rand.Int63n(1000), "val2": rand.Float64()*100.0 - 50.0},
time.Now())
pointsCh <- p
}
close(pointsCh)
}(1000000)
for t := 0; t < threads; t++ {
wg.Add(1)
go func() {
for p := range pointsCh {
writeApi.WritePoint(p)
}
wg.Done()
}()
}
wg.Wait()
}
Proxy and redirects
You can configure InfluxDB Go client behind a proxy in two ways:
-
Using environment variable
Set environment variable HTTP_PROXY
(or HTTPS_PROXY
based on the scheme of your server url).
e.g. (linux) export HTTP_PROXY=http://my-proxy:8080
or in Go code os.Setenv("HTTP_PROXY","http://my-proxy:8080")
-
Configure http.Client
to use proxy
Create a custom http.Client
with a proxy configuration:
proxyUrl, err := url.Parse("http://my-proxy:8080")
httpClient := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyUrl)
}
}
client := influxdb2.NewClientWithOptions("http://localhost:8086", token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))
Client automatically follows HTTP redirects. The default redirect policy is to follow up to 10 consecutive requests.
Due to a security reason Authorization header is not forwarded when redirect leads to a different domain.
To overcome this limitation you have to set a custom redirect handler:
token := "my-token"
httpClient := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization","Token " + token)
return nil
},
}
client := influxdb2.NewClientWithOptions("http://localhost:8086", token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))
Checking Server State
There are three functions for checking whether a server is up and ready for communication:
Function | Description | Availability |
---|
Health() | Detailed info about the server status, along with version string | OSS |
Ready() | Server uptime info | OSS |
Ping() | Whether a server is up | OSS, Cloud |
Only the Ping() function works in InfluxDB Cloud server.
InfluxDB 1.8 API compatibility
InfluxDB 1.8.0 introduced forward compatibility APIs for InfluxDB 2.0. This allow you to easily move from InfluxDB 1.x to InfluxDB 2.0 Cloud or open source.
Client API usage differences summary:
- Use the form
username:password
for an authentication token. Example: my-user:my-password
. Use an empty string (""
) if the server doesn't require authentication. - The organization parameter is not used. Use an empty string (
""
) where necessary. - Use the form
database/retention-policy
where a bucket is required. Skip retention policy if the default retention policy should be used. Examples: telegraf/autogen
, telegraf
.
The following forward compatible APIs are available:
Example
package main
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb-client-go/v2"
)
func main() {
userName := "my-user"
password := "my-password"
client := influxdb2.NewClient("http://localhost:8086", fmt.Sprintf("%s:%s",userName, password))
writeAPI := client.WriteAPIBlocking("", "test/autogen")
p := influxdb2.NewPoint("stat",
map[string]string{"unit": "temperature"},
map[string]interface{}{"avg": 24.5, "max": 45},
time.Now())
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
fmt.Printf("Write error: %s\n", err.Error())
}
queryAPI := client.QueryAPI("")
result, err := queryAPI.Query(context.Background(), `from(bucket:"test")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
if err == nil {
for result.Next() {
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
fmt.Printf("row: %s\n", result.Record().String())
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
fmt.Printf("Query error: %s\n", err.Error())
}
client.Close()
}
Contributing
If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master
branch.
License
The InfluxDB 2 Go Client is released under the MIT License.