github.com/gradientzero/comby-store-postgres
Advanced tools
@@ -8,2 +8,3 @@ package store | ||
| "fmt" | ||
| "strings" | ||
@@ -72,2 +73,6 @@ "github.com/gradientzero/comby-store-postgres/internal" | ||
| // PostgreSQL supports full MVCC — concurrent reads and writes are safe. | ||
| db.SetMaxIdleConns(10) | ||
| db.SetMaxOpenConns(100) | ||
| // otherwise, return the connection | ||
@@ -93,2 +98,8 @@ return db, nil | ||
| ); | ||
| CREATE UNIQUE INDEX IF NOT EXISTS "cmd_uuid_index" ON "commands" ( | ||
| "uuid" ASC | ||
| ); | ||
| CREATE INDEX IF NOT EXISTS "cmd_created_at_index" ON "commands" ( | ||
| "created_at" ASC | ||
| ); | ||
| ` | ||
@@ -162,7 +173,11 @@ _, err := cs.db.ExecContext(ctx, query) | ||
| } | ||
| defer func() { | ||
| if err != nil { | ||
| tx.Rollback() | ||
| } | ||
| }() | ||
| // prepare statement | ||
| query := `INSERT INTO commands ( | ||
| instance_id, | ||
| uuid, | ||
| instance_id, | ||
| uuid, | ||
| tenant_uuid, | ||
@@ -175,10 +190,6 @@ domain, | ||
| ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8);` | ||
| stmt, err := tx.Prepare(query) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // execute statement | ||
| _, err = stmt.ExecContext( | ||
| _, err = tx.ExecContext( | ||
| ctx, | ||
| query, | ||
| dbRecord.InstanceId, | ||
@@ -197,9 +208,2 @@ dbRecord.Uuid, | ||
| // close statement | ||
| err = stmt.Close() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // commit statement | ||
| return tx.Commit() | ||
@@ -216,10 +220,10 @@ } | ||
| // prepare query | ||
| var query string = "SELECT * FROM commands LIMIT 1;" | ||
| if len(getOpts.CommandUuid) > 0 { | ||
| query = fmt.Sprintf("SELECT * FROM commands WHERE uuid='%s' LIMIT 1;", getOpts.CommandUuid) | ||
| if len(getOpts.CommandUuid) == 0 { | ||
| return nil, fmt.Errorf("'%s' failed to get command - command uuid is required", cs.String()) | ||
| } | ||
| // run query (no args to not using prepared statement) | ||
| row := cs.db.QueryRowContext(ctx, query) | ||
| query := `SELECT id, instance_id, uuid, tenant_uuid, domain, | ||
| created_at, data_type, data_bytes, req_ctx | ||
| FROM commands WHERE uuid=$1 LIMIT 1;` | ||
| row := cs.db.QueryRowContext(ctx, query, getOpts.CommandUuid) | ||
| if row.Err() != nil { | ||
@@ -281,33 +285,36 @@ return nil, row.Err() | ||
| // prepare statement: (do NOT used them for Query/QueryContext) | ||
| // 1. see different syntax for postgres: | ||
| // http://go-database-sql.org/prepared.html#parameter-placeholder-syntax | ||
| // 2. db.Query and db.QueryContext for some reason it does not work as expected | ||
| // (seems to be something internally in database/sql because for SQLite and Postgres | ||
| // simply does not return the expected result after sending new values to prepared statement) | ||
| // Build WHERE clause with parameterized queries | ||
| var whereSQL string = "" | ||
| var whereList []string = []string{} | ||
| var args []any | ||
| paramIdx := 0 | ||
| if len(listOpts.TenantUuid) > 0 { | ||
| whereList = append(whereList, fmt.Sprintf("tenant_uuid='%s'", listOpts.TenantUuid)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("tenant_uuid=$%d", paramIdx)) | ||
| args = append(args, listOpts.TenantUuid) | ||
| } | ||
| if len(listOpts.Domain) > 0 { | ||
| whereList = append(whereList, fmt.Sprintf("domain='%s'", listOpts.Domain)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("domain=$%d", paramIdx)) | ||
| args = append(args, listOpts.Domain) | ||
| } | ||
| if len(listOpts.DataType) > 0 { | ||
| whereList = append(whereList, fmt.Sprintf("data_type='%s'", listOpts.DataType)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("data_type=$%d", paramIdx)) | ||
| args = append(args, listOpts.DataType) | ||
| } | ||
| if listOpts.Before >= 0 { | ||
| whereList = append(whereList, fmt.Sprintf("created_at<%d", listOpts.Before)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("created_at<$%d", paramIdx)) | ||
| args = append(args, listOpts.Before) | ||
| } | ||
| if listOpts.After >= 0 { | ||
| whereList = append(whereList, fmt.Sprintf("created_at>%d", listOpts.After)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("created_at>$%d", paramIdx)) | ||
| args = append(args, listOpts.After) | ||
| } | ||
| // note the first empty character(s) below | ||
| for index, where := range whereList { | ||
| if index == 0 { | ||
| whereSQL = fmt.Sprintf(" WHERE %s", where) | ||
| } else { | ||
| whereSQL = fmt.Sprintf("%s AND %s", whereSQL, where) | ||
| } | ||
| if len(whereList) > 0 { | ||
| whereSQL = " WHERE " + strings.Join(whereList, " AND ") | ||
| } | ||
@@ -318,3 +325,8 @@ | ||
| var queryTotalQuery string = fmt.Sprintf("SELECT COUNT(id) FROM commands%s;", whereSQL) | ||
| row := cs.db.QueryRowContext(ctx, queryTotalQuery) | ||
| var row *sql.Row | ||
| if len(args) > 0 { | ||
| row = cs.db.QueryRowContext(ctx, queryTotalQuery, args...) | ||
| } else { | ||
| row = cs.db.QueryRowContext(ctx, queryTotalQuery) | ||
| } | ||
| if err := row.Err(); err != nil { | ||
@@ -348,5 +360,11 @@ return nil, 0, err | ||
| // run query (no args to not using prepared statement - see above for more info) | ||
| var query string = fmt.Sprintf("SELECT * FROM commands%s%s%s%s;", whereSQL, orderBySQL, limitSQL, offsetSQL) | ||
| rows, err := cs.db.QueryContext(ctx, query) | ||
| // run query with parameterized values | ||
| var query string = fmt.Sprintf("SELECT id, instance_id, uuid, tenant_uuid, domain, created_at, data_type, data_bytes, req_ctx FROM commands%s%s%s%s;", whereSQL, orderBySQL, limitSQL, offsetSQL) | ||
| var rows *sql.Rows | ||
| var err error | ||
| if len(args) > 0 { | ||
| rows, err = cs.db.QueryContext(ctx, query, args...) | ||
| } else { | ||
| rows, err = cs.db.QueryContext(ctx, query) | ||
| } | ||
| switch { | ||
@@ -443,6 +461,10 @@ case err == sql.ErrNoRows: | ||
| } | ||
| defer func() { | ||
| if err != nil { | ||
| tx.Rollback() | ||
| } | ||
| }() | ||
| // prepare statement | ||
| query := `UPDATE commands SET | ||
| instance_id=$1, | ||
| instance_id=$1, | ||
| tenant_uuid=$2, | ||
@@ -455,9 +477,5 @@ domain=$3, | ||
| WHERE uuid=$8;` | ||
| stmt, err := tx.Prepare(query) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // execute statement | ||
| _, err = stmt.ExecContext(ctx, | ||
| _, err = tx.ExecContext(ctx, | ||
| query, | ||
| dbRecord.InstanceId, | ||
@@ -475,9 +493,2 @@ dbRecord.TenantUuid, | ||
| // close statement | ||
| err = stmt.Close() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // commit statement | ||
| return tx.Commit() | ||
@@ -501,5 +512,5 @@ } | ||
| // run query (no args to not using prepared statement) | ||
| query := fmt.Sprintf("DELETE FROM commands WHERE uuid='%s';", commandUuid) | ||
| _, err := cs.db.ExecContext(ctx, query) | ||
| // run query with parameterized values | ||
| query := "DELETE FROM commands WHERE uuid=$1;" | ||
| _, err := cs.db.ExecContext(ctx, query, commandUuid) | ||
| return err | ||
@@ -536,4 +547,3 @@ } | ||
| // run extra total query (no args to not using prepared statement) | ||
| var totalQuery string = fmt.Sprintf("SELECT COUNT(uuid) FROM commands;") | ||
| row := cs.db.QueryRowContext(ctx, totalQuery) | ||
| row := cs.db.QueryRowContext(ctx, "SELECT COUNT(uuid) FROM commands;") | ||
| if err := row.Err(); err != nil { | ||
@@ -549,4 +559,3 @@ return nil, err | ||
| // run extra total query (no args to not using prepared statement) | ||
| var lastEventQuery string = fmt.Sprintf("SELECT COALESCE(MAX(created_at), 0) FROM commands;") | ||
| row = cs.db.QueryRowContext(ctx, lastEventQuery) | ||
| row = cs.db.QueryRowContext(ctx, "SELECT COALESCE(MAX(created_at), 0) FROM commands;") | ||
| if err := row.Err(); err != nil { | ||
@@ -553,0 +562,0 @@ return nil, err |
+99
-69
@@ -8,2 +8,3 @@ package store | ||
| "fmt" | ||
| "strings" | ||
@@ -71,4 +72,5 @@ "github.com/gradientzero/comby-store-postgres/internal" | ||
| db.SetMaxIdleConns(1) | ||
| db.SetMaxOpenConns(1) | ||
| // PostgreSQL supports full MVCC — concurrent reads and writes are safe. | ||
| db.SetMaxIdleConns(10) | ||
| db.SetMaxOpenConns(100) | ||
@@ -100,2 +102,8 @@ // otherwise, return the connection | ||
| ); | ||
| CREATE UNIQUE INDEX IF NOT EXISTS "uuid_index" ON "events" ( | ||
| "uuid" ASC | ||
| ); | ||
| CREATE INDEX IF NOT EXISTS "created_at_index" ON "events" ( | ||
| "created_at" ASC | ||
| ); | ||
| ` | ||
@@ -170,7 +178,11 @@ _, err := es.db.ExecContext(ctx, query) | ||
| } | ||
| defer func() { | ||
| if err != nil { | ||
| tx.Rollback() | ||
| } | ||
| }() | ||
| // prepare statement | ||
| query := `INSERT INTO events ( | ||
| instance_id, | ||
| uuid, | ||
| instance_id, | ||
| uuid, | ||
| tenant_uuid, | ||
@@ -185,10 +197,6 @@ command_uuid, | ||
| ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10);` | ||
| stmt, err := tx.Prepare(query) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // execute statement | ||
| _, err = stmt.ExecContext( | ||
| _, err = tx.ExecContext( | ||
| ctx, | ||
| query, | ||
| dbRecord.InstanceId, | ||
@@ -209,9 +217,2 @@ dbRecord.Uuid, | ||
| // close statement | ||
| err = stmt.Close() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // commit statement | ||
| return tx.Commit() | ||
@@ -228,10 +229,10 @@ } | ||
| // prepare query | ||
| var query string = "SELECT * FROM events LIMIT 1;" | ||
| if len(getOpts.EventUuid) > 0 { | ||
| query = fmt.Sprintf("SELECT * FROM events WHERE uuid='%s' LIMIT 1;", getOpts.EventUuid) | ||
| if len(getOpts.EventUuid) == 0 { | ||
| return nil, fmt.Errorf("'%s' failed to get event - event uuid is required", es.String()) | ||
| } | ||
| // run query (no args to not using prepared statement) | ||
| row := es.db.QueryRowContext(ctx, query) | ||
| query := `SELECT id, instance_id, uuid, tenant_uuid, command_uuid, domain, | ||
| aggregate_uuid, version, created_at, data_type, data_bytes | ||
| FROM events WHERE uuid=$1 LIMIT 1;` | ||
| row := es.db.QueryRowContext(ctx, query, getOpts.EventUuid) | ||
| if row.Err() != nil { | ||
@@ -303,27 +304,37 @@ return nil, row.Err() | ||
| var whereList []string = []string{} | ||
| var args []any | ||
| paramIdx := 0 | ||
| if len(listOpts.TenantUuid) > 0 { | ||
| whereList = append(whereList, fmt.Sprintf("tenant_uuid='%s'", listOpts.TenantUuid)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("tenant_uuid=$%d", paramIdx)) | ||
| args = append(args, listOpts.TenantUuid) | ||
| } | ||
| if len(listOpts.AggregateUuid) > 0 { | ||
| whereList = append(whereList, fmt.Sprintf("aggregate_uuid='%s'", listOpts.AggregateUuid)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("aggregate_uuid=$%d", paramIdx)) | ||
| args = append(args, listOpts.AggregateUuid) | ||
| } | ||
| if len(listOpts.DataType) > 0 { | ||
| whereList = append(whereList, fmt.Sprintf("data_type='%s'", listOpts.DataType)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("data_type=$%d", paramIdx)) | ||
| args = append(args, listOpts.DataType) | ||
| } | ||
| if len(listOpts.Domains) > 0 { | ||
| inStr := "" | ||
| for index, _domain := range listOpts.Domains { | ||
| inStr += fmt.Sprintf("'%s'", _domain) | ||
| if len(listOpts.Domains) > 1 && index < len(listOpts.Domains)-1 { | ||
| inStr = fmt.Sprintf("%s, ", inStr) | ||
| } | ||
| placeholders := make([]string, len(listOpts.Domains)) | ||
| for i, d := range listOpts.Domains { | ||
| paramIdx++ | ||
| placeholders[i] = fmt.Sprintf("$%d", paramIdx) | ||
| args = append(args, d) | ||
| } | ||
| stmt := fmt.Sprintf("domain IN (%s)", inStr) | ||
| whereList = append(whereList, stmt) | ||
| whereList = append(whereList, fmt.Sprintf("domain IN (%s)", strings.Join(placeholders, ","))) | ||
| } | ||
| if listOpts.Before >= 0 { | ||
| whereList = append(whereList, fmt.Sprintf("created_at<%d", listOpts.Before)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("created_at<$%d", paramIdx)) | ||
| args = append(args, listOpts.Before) | ||
| } | ||
| if listOpts.After >= 0 { | ||
| whereList = append(whereList, fmt.Sprintf("created_at>%d", listOpts.After)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("created_at>$%d", paramIdx)) | ||
| args = append(args, listOpts.After) | ||
| } | ||
@@ -343,3 +354,8 @@ | ||
| var queryTotalQuery string = fmt.Sprintf("SELECT COUNT(id) FROM events%s;", whereSQL) | ||
| row := es.db.QueryRowContext(ctx, queryTotalQuery) | ||
| var row *sql.Row | ||
| if len(args) > 0 { | ||
| row = es.db.QueryRowContext(ctx, queryTotalQuery, args...) | ||
| } else { | ||
| row = es.db.QueryRowContext(ctx, queryTotalQuery) | ||
| } | ||
| if err := row.Err(); err != nil { | ||
@@ -373,5 +389,11 @@ return nil, 0, err | ||
| // run query (no args to not using prepared statement - see above for more info) | ||
| var query string = fmt.Sprintf("SELECT * FROM events%s%s%s%s;", whereSQL, orderBySQL, limitSQL, offsetSQL) | ||
| rows, err := es.db.QueryContext(ctx, query) | ||
| // run query with parameterized values | ||
| var query string = fmt.Sprintf("SELECT id, instance_id, uuid, tenant_uuid, command_uuid, domain, aggregate_uuid, version, created_at, data_type, data_bytes FROM events%s%s%s%s;", whereSQL, orderBySQL, limitSQL, offsetSQL) | ||
| var rows *sql.Rows | ||
| var err error | ||
| if len(args) > 0 { | ||
| rows, err = es.db.QueryContext(ctx, query, args...) | ||
| } else { | ||
| rows, err = es.db.QueryContext(ctx, query) | ||
| } | ||
| switch { | ||
@@ -471,6 +493,10 @@ case err == sql.ErrNoRows: | ||
| } | ||
| defer func() { | ||
| if err != nil { | ||
| tx.Rollback() | ||
| } | ||
| }() | ||
| // prepare statement | ||
| query := `UPDATE events SET | ||
| instance_id=$1, | ||
| instance_id=$1, | ||
| tenant_uuid=$2, | ||
@@ -485,9 +511,5 @@ command_uuid=$3, | ||
| WHERE uuid=$10;` | ||
| stmt, err := tx.Prepare(query) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // execute statement | ||
| _, err = stmt.ExecContext(ctx, | ||
| _, err = tx.ExecContext(ctx, | ||
| query, | ||
| dbRecord.InstanceId, | ||
@@ -507,9 +529,2 @@ dbRecord.TenantUuid, | ||
| // close statement | ||
| err = stmt.Close() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // commit statement | ||
| return tx.Commit() | ||
@@ -534,5 +549,5 @@ } | ||
| // run query (no args to not using prepared statement) | ||
| query := fmt.Sprintf("DELETE FROM events WHERE uuid='%s';", eventUuid) | ||
| _, err := es.db.ExecContext(ctx, query) | ||
| // run query with parameterized values | ||
| query := "DELETE FROM events WHERE uuid=$1;" | ||
| _, err := es.db.ExecContext(ctx, query, eventUuid) | ||
| return err | ||
@@ -571,7 +586,13 @@ } | ||
| var whereList []string = []string{} | ||
| var args []any | ||
| paramIdx := 0 | ||
| if len(listOpts.TenantUuid) > 0 { | ||
| whereList = append(whereList, fmt.Sprintf("tenant_uuid='%s'", listOpts.TenantUuid)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("tenant_uuid=$%d", paramIdx)) | ||
| args = append(args, listOpts.TenantUuid) | ||
| } | ||
| if len(listOpts.Domain) > 0 { | ||
| whereList = append(whereList, fmt.Sprintf("domain='%s'", listOpts.Domain)) | ||
| paramIdx++ | ||
| whereList = append(whereList, fmt.Sprintf("domain=$%d", paramIdx)) | ||
| args = append(args, listOpts.Domain) | ||
| } | ||
@@ -608,5 +629,11 @@ | ||
| // run query (no args to not using prepared statement) | ||
| // run query with parameterized values | ||
| var query string = fmt.Sprintf("SELECT DISTINCT %s FROM events%s%s%s%s;", listOpts.DbField, whereSQL, orderBySQL, limitSQL, offsetSQL) | ||
| rows, err := es.db.QueryContext(ctx, query) | ||
| var rows *sql.Rows | ||
| var err error | ||
| if len(args) > 0 { | ||
| rows, err = es.db.QueryContext(ctx, query, args...) | ||
| } else { | ||
| rows, err = es.db.QueryContext(ctx, query) | ||
| } | ||
| switch { | ||
@@ -638,5 +665,10 @@ case err == sql.ErrNoRows: | ||
| // run extra total query (no args to not using prepared statement) | ||
| // run extra total query with parameterized values | ||
| var totalQuery string = fmt.Sprintf("SELECT COUNT(DISTINCT %s) FROM events%s;", listOpts.DbField, whereSQL) | ||
| row := es.db.QueryRowContext(ctx, totalQuery) | ||
| var row *sql.Row | ||
| if len(args) > 0 { | ||
| row = es.db.QueryRowContext(ctx, totalQuery, args...) | ||
| } else { | ||
| row = es.db.QueryRowContext(ctx, totalQuery) | ||
| } | ||
| if err := row.Err(); err != nil { | ||
@@ -669,4 +701,3 @@ return nil, 0, err | ||
| // run extra total query (no args to not using prepared statement) | ||
| var totalQuery string = fmt.Sprintf("SELECT COUNT(uuid) FROM events;") | ||
| row := es.db.QueryRowContext(ctx, totalQuery) | ||
| row := es.db.QueryRowContext(ctx, "SELECT COUNT(uuid) FROM events;") | ||
| if err := row.Err(); err != nil { | ||
@@ -682,4 +713,3 @@ return nil, err | ||
| // run extra total query (no args to not using prepared statement) | ||
| var lastEventQuery string = fmt.Sprintf("SELECT COALESCE(MAX(created_at), 0) FROM events;") | ||
| row = es.db.QueryRowContext(ctx, lastEventQuery) | ||
| row = es.db.QueryRowContext(ctx, "SELECT COALESCE(MAX(created_at), 0) FROM events;") | ||
| if err := row.Err(); err != nil { | ||
@@ -686,0 +716,0 @@ return nil, err |