You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

github.com/gradientzero/comby-store-postgres

Package Overview
Dependencies
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/gradientzero/comby-store-postgres - go Package Compare versions

Comparing version
v1.0.9
to
v1.0.10
+74
-65
command.store.postgres.go

@@ -8,2 +8,3 @@ package store

"fmt"
"strings"

@@ -72,2 +73,6 @@ "github.com/gradientzero/comby-store-postgres/internal"

// PostgreSQL supports full MVCC — concurrent reads and writes are safe.
db.SetMaxIdleConns(10)
db.SetMaxOpenConns(100)
// otherwise, return the connection

@@ -93,2 +98,8 @@ return db, nil

);
CREATE UNIQUE INDEX IF NOT EXISTS "cmd_uuid_index" ON "commands" (
"uuid" ASC
);
CREATE INDEX IF NOT EXISTS "cmd_created_at_index" ON "commands" (
"created_at" ASC
);
`

@@ -162,7 +173,11 @@ _, err := cs.db.ExecContext(ctx, query)

}
defer func() {
if err != nil {
tx.Rollback()
}
}()
// prepare statement
query := `INSERT INTO commands (
instance_id,
uuid,
instance_id,
uuid,
tenant_uuid,

@@ -175,10 +190,6 @@ domain,

) VALUES ($1,$2,$3,$4,$5,$6,$7,$8);`
stmt, err := tx.Prepare(query)
if err != nil {
return err
}
// execute statement
_, err = stmt.ExecContext(
_, err = tx.ExecContext(
ctx,
query,
dbRecord.InstanceId,

@@ -197,9 +208,2 @@ dbRecord.Uuid,

// close statement
err = stmt.Close()
if err != nil {
return err
}
// commit statement
return tx.Commit()

@@ -216,10 +220,10 @@ }

// prepare query
var query string = "SELECT * FROM commands LIMIT 1;"
if len(getOpts.CommandUuid) > 0 {
query = fmt.Sprintf("SELECT * FROM commands WHERE uuid='%s' LIMIT 1;", getOpts.CommandUuid)
if len(getOpts.CommandUuid) == 0 {
return nil, fmt.Errorf("'%s' failed to get command - command uuid is required", cs.String())
}
// run query (no args to not using prepared statement)
row := cs.db.QueryRowContext(ctx, query)
query := `SELECT id, instance_id, uuid, tenant_uuid, domain,
created_at, data_type, data_bytes, req_ctx
FROM commands WHERE uuid=$1 LIMIT 1;`
row := cs.db.QueryRowContext(ctx, query, getOpts.CommandUuid)
if row.Err() != nil {

@@ -281,33 +285,36 @@ return nil, row.Err()

// prepare statement: (do NOT used them for Query/QueryContext)
// 1. see different syntax for postgres:
// http://go-database-sql.org/prepared.html#parameter-placeholder-syntax
// 2. db.Query and db.QueryContext for some reason it does not work as expected
// (seems to be something internally in database/sql because for SQLite and Postgres
// simply does not return the expected result after sending new values to prepared statement)
// Build WHERE clause with parameterized queries
var whereSQL string = ""
var whereList []string = []string{}
var args []any
paramIdx := 0
if len(listOpts.TenantUuid) > 0 {
whereList = append(whereList, fmt.Sprintf("tenant_uuid='%s'", listOpts.TenantUuid))
paramIdx++
whereList = append(whereList, fmt.Sprintf("tenant_uuid=$%d", paramIdx))
args = append(args, listOpts.TenantUuid)
}
if len(listOpts.Domain) > 0 {
whereList = append(whereList, fmt.Sprintf("domain='%s'", listOpts.Domain))
paramIdx++
whereList = append(whereList, fmt.Sprintf("domain=$%d", paramIdx))
args = append(args, listOpts.Domain)
}
if len(listOpts.DataType) > 0 {
whereList = append(whereList, fmt.Sprintf("data_type='%s'", listOpts.DataType))
paramIdx++
whereList = append(whereList, fmt.Sprintf("data_type=$%d", paramIdx))
args = append(args, listOpts.DataType)
}
if listOpts.Before >= 0 {
whereList = append(whereList, fmt.Sprintf("created_at<%d", listOpts.Before))
paramIdx++
whereList = append(whereList, fmt.Sprintf("created_at<$%d", paramIdx))
args = append(args, listOpts.Before)
}
if listOpts.After >= 0 {
whereList = append(whereList, fmt.Sprintf("created_at>%d", listOpts.After))
paramIdx++
whereList = append(whereList, fmt.Sprintf("created_at>$%d", paramIdx))
args = append(args, listOpts.After)
}
// note the first empty character(s) below
for index, where := range whereList {
if index == 0 {
whereSQL = fmt.Sprintf(" WHERE %s", where)
} else {
whereSQL = fmt.Sprintf("%s AND %s", whereSQL, where)
}
if len(whereList) > 0 {
whereSQL = " WHERE " + strings.Join(whereList, " AND ")
}

@@ -318,3 +325,8 @@

var queryTotalQuery string = fmt.Sprintf("SELECT COUNT(id) FROM commands%s;", whereSQL)
row := cs.db.QueryRowContext(ctx, queryTotalQuery)
var row *sql.Row
if len(args) > 0 {
row = cs.db.QueryRowContext(ctx, queryTotalQuery, args...)
} else {
row = cs.db.QueryRowContext(ctx, queryTotalQuery)
}
if err := row.Err(); err != nil {

@@ -348,5 +360,11 @@ return nil, 0, err

// run query (no args to not using prepared statement - see above for more info)
var query string = fmt.Sprintf("SELECT * FROM commands%s%s%s%s;", whereSQL, orderBySQL, limitSQL, offsetSQL)
rows, err := cs.db.QueryContext(ctx, query)
// run query with parameterized values
var query string = fmt.Sprintf("SELECT id, instance_id, uuid, tenant_uuid, domain, created_at, data_type, data_bytes, req_ctx FROM commands%s%s%s%s;", whereSQL, orderBySQL, limitSQL, offsetSQL)
var rows *sql.Rows
var err error
if len(args) > 0 {
rows, err = cs.db.QueryContext(ctx, query, args...)
} else {
rows, err = cs.db.QueryContext(ctx, query)
}
switch {

@@ -443,6 +461,10 @@ case err == sql.ErrNoRows:

}
defer func() {
if err != nil {
tx.Rollback()
}
}()
// prepare statement
query := `UPDATE commands SET
instance_id=$1,
instance_id=$1,
tenant_uuid=$2,

@@ -455,9 +477,5 @@ domain=$3,

WHERE uuid=$8;`
stmt, err := tx.Prepare(query)
if err != nil {
return err
}
// execute statement
_, err = stmt.ExecContext(ctx,
_, err = tx.ExecContext(ctx,
query,
dbRecord.InstanceId,

@@ -475,9 +493,2 @@ dbRecord.TenantUuid,

// close statement
err = stmt.Close()
if err != nil {
return err
}
// commit statement
return tx.Commit()

@@ -501,5 +512,5 @@ }

// run query (no args to not using prepared statement)
query := fmt.Sprintf("DELETE FROM commands WHERE uuid='%s';", commandUuid)
_, err := cs.db.ExecContext(ctx, query)
// run query with parameterized values
query := "DELETE FROM commands WHERE uuid=$1;"
_, err := cs.db.ExecContext(ctx, query, commandUuid)
return err

@@ -536,4 +547,3 @@ }

// run extra total query (no args to not using prepared statement)
var totalQuery string = fmt.Sprintf("SELECT COUNT(uuid) FROM commands;")
row := cs.db.QueryRowContext(ctx, totalQuery)
row := cs.db.QueryRowContext(ctx, "SELECT COUNT(uuid) FROM commands;")
if err := row.Err(); err != nil {

@@ -549,4 +559,3 @@ return nil, err

// run extra total query (no args to not using prepared statement)
var lastEventQuery string = fmt.Sprintf("SELECT COALESCE(MAX(created_at), 0) FROM commands;")
row = cs.db.QueryRowContext(ctx, lastEventQuery)
row = cs.db.QueryRowContext(ctx, "SELECT COALESCE(MAX(created_at), 0) FROM commands;")
if err := row.Err(); err != nil {

@@ -553,0 +562,0 @@ return nil, err

@@ -8,2 +8,3 @@ package store

"fmt"
"strings"

@@ -71,4 +72,5 @@ "github.com/gradientzero/comby-store-postgres/internal"

db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
// PostgreSQL supports full MVCC — concurrent reads and writes are safe.
db.SetMaxIdleConns(10)
db.SetMaxOpenConns(100)

@@ -100,2 +102,8 @@ // otherwise, return the connection

);
CREATE UNIQUE INDEX IF NOT EXISTS "uuid_index" ON "events" (
"uuid" ASC
);
CREATE INDEX IF NOT EXISTS "created_at_index" ON "events" (
"created_at" ASC
);
`

@@ -170,7 +178,11 @@ _, err := es.db.ExecContext(ctx, query)

}
defer func() {
if err != nil {
tx.Rollback()
}
}()
// prepare statement
query := `INSERT INTO events (
instance_id,
uuid,
instance_id,
uuid,
tenant_uuid,

@@ -185,10 +197,6 @@ command_uuid,

) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10);`
stmt, err := tx.Prepare(query)
if err != nil {
return err
}
// execute statement
_, err = stmt.ExecContext(
_, err = tx.ExecContext(
ctx,
query,
dbRecord.InstanceId,

@@ -209,9 +217,2 @@ dbRecord.Uuid,

// close statement
err = stmt.Close()
if err != nil {
return err
}
// commit statement
return tx.Commit()

@@ -228,10 +229,10 @@ }

// prepare query
var query string = "SELECT * FROM events LIMIT 1;"
if len(getOpts.EventUuid) > 0 {
query = fmt.Sprintf("SELECT * FROM events WHERE uuid='%s' LIMIT 1;", getOpts.EventUuid)
if len(getOpts.EventUuid) == 0 {
return nil, fmt.Errorf("'%s' failed to get event - event uuid is required", es.String())
}
// run query (no args to not using prepared statement)
row := es.db.QueryRowContext(ctx, query)
query := `SELECT id, instance_id, uuid, tenant_uuid, command_uuid, domain,
aggregate_uuid, version, created_at, data_type, data_bytes
FROM events WHERE uuid=$1 LIMIT 1;`
row := es.db.QueryRowContext(ctx, query, getOpts.EventUuid)
if row.Err() != nil {

@@ -303,27 +304,37 @@ return nil, row.Err()

var whereList []string = []string{}
var args []any
paramIdx := 0
if len(listOpts.TenantUuid) > 0 {
whereList = append(whereList, fmt.Sprintf("tenant_uuid='%s'", listOpts.TenantUuid))
paramIdx++
whereList = append(whereList, fmt.Sprintf("tenant_uuid=$%d", paramIdx))
args = append(args, listOpts.TenantUuid)
}
if len(listOpts.AggregateUuid) > 0 {
whereList = append(whereList, fmt.Sprintf("aggregate_uuid='%s'", listOpts.AggregateUuid))
paramIdx++
whereList = append(whereList, fmt.Sprintf("aggregate_uuid=$%d", paramIdx))
args = append(args, listOpts.AggregateUuid)
}
if len(listOpts.DataType) > 0 {
whereList = append(whereList, fmt.Sprintf("data_type='%s'", listOpts.DataType))
paramIdx++
whereList = append(whereList, fmt.Sprintf("data_type=$%d", paramIdx))
args = append(args, listOpts.DataType)
}
if len(listOpts.Domains) > 0 {
inStr := ""
for index, _domain := range listOpts.Domains {
inStr += fmt.Sprintf("'%s'", _domain)
if len(listOpts.Domains) > 1 && index < len(listOpts.Domains)-1 {
inStr = fmt.Sprintf("%s, ", inStr)
}
placeholders := make([]string, len(listOpts.Domains))
for i, d := range listOpts.Domains {
paramIdx++
placeholders[i] = fmt.Sprintf("$%d", paramIdx)
args = append(args, d)
}
stmt := fmt.Sprintf("domain IN (%s)", inStr)
whereList = append(whereList, stmt)
whereList = append(whereList, fmt.Sprintf("domain IN (%s)", strings.Join(placeholders, ",")))
}
if listOpts.Before >= 0 {
whereList = append(whereList, fmt.Sprintf("created_at<%d", listOpts.Before))
paramIdx++
whereList = append(whereList, fmt.Sprintf("created_at<$%d", paramIdx))
args = append(args, listOpts.Before)
}
if listOpts.After >= 0 {
whereList = append(whereList, fmt.Sprintf("created_at>%d", listOpts.After))
paramIdx++
whereList = append(whereList, fmt.Sprintf("created_at>$%d", paramIdx))
args = append(args, listOpts.After)
}

@@ -343,3 +354,8 @@

var queryTotalQuery string = fmt.Sprintf("SELECT COUNT(id) FROM events%s;", whereSQL)
row := es.db.QueryRowContext(ctx, queryTotalQuery)
var row *sql.Row
if len(args) > 0 {
row = es.db.QueryRowContext(ctx, queryTotalQuery, args...)
} else {
row = es.db.QueryRowContext(ctx, queryTotalQuery)
}
if err := row.Err(); err != nil {

@@ -373,5 +389,11 @@ return nil, 0, err

// run query (no args to not using prepared statement - see above for more info)
var query string = fmt.Sprintf("SELECT * FROM events%s%s%s%s;", whereSQL, orderBySQL, limitSQL, offsetSQL)
rows, err := es.db.QueryContext(ctx, query)
// run query with parameterized values
var query string = fmt.Sprintf("SELECT id, instance_id, uuid, tenant_uuid, command_uuid, domain, aggregate_uuid, version, created_at, data_type, data_bytes FROM events%s%s%s%s;", whereSQL, orderBySQL, limitSQL, offsetSQL)
var rows *sql.Rows
var err error
if len(args) > 0 {
rows, err = es.db.QueryContext(ctx, query, args...)
} else {
rows, err = es.db.QueryContext(ctx, query)
}
switch {

@@ -471,6 +493,10 @@ case err == sql.ErrNoRows:

}
defer func() {
if err != nil {
tx.Rollback()
}
}()
// prepare statement
query := `UPDATE events SET
instance_id=$1,
instance_id=$1,
tenant_uuid=$2,

@@ -485,9 +511,5 @@ command_uuid=$3,

WHERE uuid=$10;`
stmt, err := tx.Prepare(query)
if err != nil {
return err
}
// execute statement
_, err = stmt.ExecContext(ctx,
_, err = tx.ExecContext(ctx,
query,
dbRecord.InstanceId,

@@ -507,9 +529,2 @@ dbRecord.TenantUuid,

// close statement
err = stmt.Close()
if err != nil {
return err
}
// commit statement
return tx.Commit()

@@ -534,5 +549,5 @@ }

// run query (no args to not using prepared statement)
query := fmt.Sprintf("DELETE FROM events WHERE uuid='%s';", eventUuid)
_, err := es.db.ExecContext(ctx, query)
// run query with parameterized values
query := "DELETE FROM events WHERE uuid=$1;"
_, err := es.db.ExecContext(ctx, query, eventUuid)
return err

@@ -571,7 +586,13 @@ }

var whereList []string = []string{}
var args []any
paramIdx := 0
if len(listOpts.TenantUuid) > 0 {
whereList = append(whereList, fmt.Sprintf("tenant_uuid='%s'", listOpts.TenantUuid))
paramIdx++
whereList = append(whereList, fmt.Sprintf("tenant_uuid=$%d", paramIdx))
args = append(args, listOpts.TenantUuid)
}
if len(listOpts.Domain) > 0 {
whereList = append(whereList, fmt.Sprintf("domain='%s'", listOpts.Domain))
paramIdx++
whereList = append(whereList, fmt.Sprintf("domain=$%d", paramIdx))
args = append(args, listOpts.Domain)
}

@@ -608,5 +629,11 @@

// run query (no args to not using prepared statement)
// run query with parameterized values
var query string = fmt.Sprintf("SELECT DISTINCT %s FROM events%s%s%s%s;", listOpts.DbField, whereSQL, orderBySQL, limitSQL, offsetSQL)
rows, err := es.db.QueryContext(ctx, query)
var rows *sql.Rows
var err error
if len(args) > 0 {
rows, err = es.db.QueryContext(ctx, query, args...)
} else {
rows, err = es.db.QueryContext(ctx, query)
}
switch {

@@ -638,5 +665,10 @@ case err == sql.ErrNoRows:

// run extra total query (no args to not using prepared statement)
// run extra total query with parameterized values
var totalQuery string = fmt.Sprintf("SELECT COUNT(DISTINCT %s) FROM events%s;", listOpts.DbField, whereSQL)
row := es.db.QueryRowContext(ctx, totalQuery)
var row *sql.Row
if len(args) > 0 {
row = es.db.QueryRowContext(ctx, totalQuery, args...)
} else {
row = es.db.QueryRowContext(ctx, totalQuery)
}
if err := row.Err(); err != nil {

@@ -669,4 +701,3 @@ return nil, 0, err

// run extra total query (no args to not using prepared statement)
var totalQuery string = fmt.Sprintf("SELECT COUNT(uuid) FROM events;")
row := es.db.QueryRowContext(ctx, totalQuery)
row := es.db.QueryRowContext(ctx, "SELECT COUNT(uuid) FROM events;")
if err := row.Err(); err != nil {

@@ -682,4 +713,3 @@ return nil, err

// run extra total query (no args to not using prepared statement)
var lastEventQuery string = fmt.Sprintf("SELECT COALESCE(MAX(created_at), 0) FROM events;")
row = es.db.QueryRowContext(ctx, lastEventQuery)
row = es.db.QueryRowContext(ctx, "SELECT COALESCE(MAX(created_at), 0) FROM events;")
if err := row.Err(); err != nil {

@@ -686,0 +716,0 @@ return nil, err