| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774 |
- package user
- import (
- "database/sql"
- "encoding/json"
- "errors"
- "fmt"
- _ "github.com/mattn/go-sqlite3" // SQLite driver
- "golang.org/x/crypto/bcrypt"
- "heckel.io/ntfy/log"
- "heckel.io/ntfy/util"
- "strings"
- "sync"
- "time"
- )
- const (
- bcryptCost = 10
- intentionalSlowDownHash = "$2a$10$YFCQvqQDwIIwnJM1xkAYOeih0dg17UVGanaTStnrSzC8NCWxcLDwy" // Cost should match bcryptCost
- userStatsQueueWriterInterval = 33 * time.Second
- tokenLength = 32
- tokenExpiryDuration = 72 * time.Hour // Extend tokens by this much
- tokenMaxCount = 10 // Only keep this many tokens in the table per user
- )
- var (
- errNoTokenProvided = errors.New("no token provided")
- errTopicOwnedByOthers = errors.New("topic owned by others")
- errNoRows = errors.New("no rows found")
- )
- // Manager-related queries
- const (
- createTablesQueriesNoTx = `
- CREATE TABLE IF NOT EXISTS plan (
- id INT NOT NULL,
- code TEXT NOT NULL,
- messages_limit INT NOT NULL,
- emails_limit INT NOT NULL,
- topics_limit INT NOT NULL,
- attachment_file_size_limit INT NOT NULL,
- attachment_total_size_limit INT NOT NULL,
- PRIMARY KEY (id)
- );
- CREATE TABLE IF NOT EXISTS user (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- plan_id INT,
- user TEXT NOT NULL,
- pass TEXT NOT NULL,
- role TEXT NOT NULL,
- messages INT NOT NULL DEFAULT (0),
- emails INT NOT NULL DEFAULT (0),
- settings JSON,
- FOREIGN KEY (plan_id) REFERENCES plan (id)
- );
- CREATE UNIQUE INDEX idx_user ON user (user);
- CREATE TABLE IF NOT EXISTS user_access (
- user_id INT NOT NULL,
- topic TEXT NOT NULL,
- read INT NOT NULL,
- write INT NOT NULL,
- owner_user_id INT,
- PRIMARY KEY (user_id, topic),
- FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE,
- FOREIGN KEY (owner_user_id) REFERENCES user (id) ON DELETE CASCADE
- );
- CREATE TABLE IF NOT EXISTS user_token (
- user_id INT NOT NULL,
- token TEXT NOT NULL,
- expires INT NOT NULL,
- PRIMARY KEY (user_id, token),
- FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
- );
- CREATE TABLE IF NOT EXISTS schemaVersion (
- id INT PRIMARY KEY,
- version INT NOT NULL
- );
- INSERT INTO user (id, user, pass, role) VALUES (1, '*', '', 'anonymous') ON CONFLICT (id) DO NOTHING;
- `
- createTablesQueries = `BEGIN; ` + createTablesQueriesNoTx + ` COMMIT;`
- builtinStartupQueries = `
- PRAGMA foreign_keys = ON;
- `
- selectUserByNameQuery = `
- SELECT u.user, u.pass, u.role, u.messages, u.emails, u.settings, p.code, p.messages_limit, p.emails_limit, p.topics_limit, p.attachment_file_size_limit, p.attachment_total_size_limit
- FROM user u
- LEFT JOIN plan p on p.id = u.plan_id
- WHERE user = ?
- `
- selectUserByTokenQuery = `
- SELECT u.user, u.pass, u.role, u.messages, u.emails, u.settings, p.code, p.messages_limit, p.emails_limit, p.topics_limit, p.attachment_file_size_limit, p.attachment_total_size_limit
- FROM user u
- JOIN user_token t on u.id = t.user_id
- LEFT JOIN plan p on p.id = u.plan_id
- WHERE t.token = ? AND t.expires >= ?
- `
- selectTopicPermsQuery = `
- SELECT read, write
- FROM user_access a
- JOIN user u ON u.id = a.user_id
- WHERE (u.user = ? OR u.user = ?) AND ? LIKE a.topic
- ORDER BY u.user DESC
- `
- insertUserQuery = `INSERT INTO user (user, pass, role) VALUES (?, ?, ?)`
- selectUsernamesQuery = `
- SELECT user
- FROM user
- ORDER BY
- CASE role
- WHEN 'admin' THEN 1
- WHEN 'anonymous' THEN 3
- ELSE 2
- END, user
- `
- updateUserPassQuery = `UPDATE user SET pass = ? WHERE user = ?`
- updateUserRoleQuery = `UPDATE user SET role = ? WHERE user = ?`
- updateUserSettingsQuery = `UPDATE user SET settings = ? WHERE user = ?`
- updateUserStatsQuery = `UPDATE user SET messages = ?, emails = ? WHERE user = ?`
- deleteUserQuery = `DELETE FROM user WHERE user = ?`
- upsertUserAccessQuery = `
- INSERT INTO user_access (user_id, topic, read, write, owner_user_id)
- VALUES ((SELECT id FROM user WHERE user = ?), ?, ?, ?, (SELECT IIF(?='',NULL,(SELECT id FROM user WHERE user=?))))
- ON CONFLICT (user_id, topic)
- DO UPDATE SET read=excluded.read, write=excluded.write, owner_user_id=excluded.owner_user_id
- `
- selectUserAccessQuery = `
- SELECT topic, read, write
- FROM user_access
- WHERE user_id = (SELECT id FROM user WHERE user = ?)
- ORDER BY write DESC, read DESC, topic
- `
- selectUserReservationsQuery = `
- SELECT a_user.topic, a_user.read, a_user.write, a_everyone.read AS everyone_read, a_everyone.write AS everyone_write
- FROM user_access a_user
- LEFT JOIN user_access a_everyone ON a_user.topic = a_everyone.topic AND a_everyone.user_id = (SELECT id FROM user WHERE user = ?)
- WHERE a_user.user_id = a_user.owner_user_id
- AND a_user.owner_user_id = (SELECT id FROM user WHERE user = ?)
- ORDER BY a_user.topic
- `
- selectOtherAccessCountQuery = `
- SELECT COUNT(*)
- FROM user_access
- WHERE (topic = ? OR ? LIKE topic)
- AND (owner_user_id IS NULL OR owner_user_id != (SELECT id FROM user WHERE user = ?))
- `
- deleteAllAccessQuery = `DELETE FROM user_access`
- deleteUserAccessQuery = `DELETE FROM user_access WHERE user_id = (SELECT id FROM user WHERE user = ?)`
- deleteTopicAccessQuery = `DELETE FROM user_access WHERE user_id = (SELECT id FROM user WHERE user = ?) AND topic = ?`
- selectTokenCountQuery = `SELECT COUNT(*) FROM user_token WHERE (SELECT id FROM user WHERE user = ?)`
- insertTokenQuery = `INSERT INTO user_token (user_id, token, expires) VALUES ((SELECT id FROM user WHERE user = ?), ?, ?)`
- updateTokenExpiryQuery = `UPDATE user_token SET expires = ? WHERE user_id = (SELECT id FROM user WHERE user = ?) AND token = ?`
- deleteTokenQuery = `DELETE FROM user_token WHERE user_id = (SELECT id FROM user WHERE user = ?) AND token = ?`
- deleteExpiredTokensQuery = `DELETE FROM user_token WHERE expires < ?`
- deleteExcessTokensQuery = `
- DELETE FROM user_token
- WHERE (user_id, token) NOT IN (
- SELECT user_id, token
- FROM user_token
- WHERE user_id = (SELECT id FROM user WHERE user = ?)
- ORDER BY expires DESC
- LIMIT ?
- )
- ;
- `
- )
- // Schema management queries
- const (
- currentSchemaVersion = 2
- insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
- updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
- selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
- // 1 -> 2 (complex migration!)
- migrate1To2RenameUserTableQueryNoTx = `
- ALTER TABLE user RENAME TO user_old;
- `
- migrate1To2InsertFromOldTablesAndDropNoTx = `
- INSERT INTO user (user, pass, role)
- SELECT user, pass, role FROM user_old;
- INSERT INTO user_access (user_id, topic, read, write)
- SELECT u.id, a.topic, a.read, a.write
- FROM user u
- JOIN access a ON u.user = a.user;
- DROP TABLE access;
- DROP TABLE user_old;
- `
- )
- // Manager is an implementation of Manager. It stores users and access control list
- // in a SQLite database.
- type Manager struct {
- db *sql.DB
- defaultAccess Permission // Default permission if no ACL matches
- statsQueue map[string]*User // Username -> User, for "unimportant" user updates
- mu sync.Mutex
- }
- var _ Auther = (*Manager)(nil)
- // NewManager creates a new Manager instance
- func NewManager(filename, startupQueries string, defaultAccess Permission) (*Manager, error) {
- return newManager(filename, startupQueries, defaultAccess, userStatsQueueWriterInterval)
- }
- // NewManager creates a new Manager instance
- func newManager(filename, startupQueries string, defaultAccess Permission, statsWriterInterval time.Duration) (*Manager, error) {
- db, err := sql.Open("sqlite3", filename)
- if err != nil {
- return nil, err
- }
- if err := setupDB(db); err != nil {
- return nil, err
- }
- if err := runStartupQueries(db, startupQueries); err != nil {
- return nil, err
- }
- manager := &Manager{
- db: db,
- defaultAccess: defaultAccess,
- statsQueue: make(map[string]*User),
- }
- go manager.userStatsQueueWriter(statsWriterInterval)
- return manager, nil
- }
- // Authenticate checks username and password and returns a User if correct. The method
- // returns in constant-ish time, regardless of whether the user exists or the password is
- // correct or incorrect.
- func (a *Manager) Authenticate(username, password string) (*User, error) {
- if username == Everyone {
- return nil, ErrUnauthenticated
- }
- user, err := a.User(username)
- if err != nil {
- log.Trace("authentication of user %s failed (1): %s", username, err.Error())
- bcrypt.CompareHashAndPassword([]byte(intentionalSlowDownHash), []byte("intentional slow-down to avoid timing attacks"))
- return nil, ErrUnauthenticated
- }
- if err := bcrypt.CompareHashAndPassword([]byte(user.Hash), []byte(password)); err != nil {
- log.Trace("authentication of user %s failed (2): %s", username, err.Error())
- return nil, ErrUnauthenticated
- }
- return user, nil
- }
- // AuthenticateToken checks if the token exists and returns the associated User if it does.
- // The method sets the User.Token value to the token that was used for authentication.
- func (a *Manager) AuthenticateToken(token string) (*User, error) {
- if len(token) != tokenLength {
- return nil, ErrUnauthenticated
- }
- user, err := a.userByToken(token)
- if err != nil {
- return nil, ErrUnauthenticated
- }
- user.Token = token
- return user, nil
- }
- // CreateToken generates a random token for the given user and returns it. The token expires
- // after a fixed duration unless ExtendToken is called. This function also prunes tokens for the
- // given user, if there are too many of them.
- func (a *Manager) CreateToken(user *User) (*Token, error) {
- token, expires := util.RandomString(tokenLength), time.Now().Add(tokenExpiryDuration)
- tx, err := a.db.Begin()
- if err != nil {
- return nil, err
- }
- defer tx.Rollback()
- if _, err := tx.Exec(insertTokenQuery, user.Name, token, expires.Unix()); err != nil {
- return nil, err
- }
- rows, err := tx.Query(selectTokenCountQuery, user.Name)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- if !rows.Next() {
- return nil, errNoRows
- }
- var tokenCount int
- if err := rows.Scan(&tokenCount); err != nil {
- return nil, err
- }
- if tokenCount >= tokenMaxCount {
- // This pruning logic is done in two queries for efficiency. The SELECT above is a lookup
- // on two indices, whereas the query below is a full table scan.
- if _, err := tx.Exec(deleteExcessTokensQuery, user.Name, tokenMaxCount); err != nil {
- return nil, err
- }
- }
- if err := tx.Commit(); err != nil {
- return nil, err
- }
- return &Token{
- Value: token,
- Expires: expires,
- }, nil
- }
- // ExtendToken sets the new expiry date for a token, thereby extending its use further into the future.
- func (a *Manager) ExtendToken(user *User) (*Token, error) {
- if user.Token == "" {
- return nil, errNoTokenProvided
- }
- newExpires := time.Now().Add(tokenExpiryDuration)
- if _, err := a.db.Exec(updateTokenExpiryQuery, newExpires.Unix(), user.Name, user.Token); err != nil {
- return nil, err
- }
- return &Token{
- Value: user.Token,
- Expires: newExpires,
- }, nil
- }
- // RemoveToken deletes the token defined in User.Token
- func (a *Manager) RemoveToken(user *User) error {
- if user.Token == "" {
- return ErrUnauthorized
- }
- if _, err := a.db.Exec(deleteTokenQuery, user.Name, user.Token); err != nil {
- return err
- }
- return nil
- }
- // RemoveExpiredTokens deletes all expired tokens from the database
- func (a *Manager) RemoveExpiredTokens() error {
- if _, err := a.db.Exec(deleteExpiredTokensQuery, time.Now().Unix()); err != nil {
- return err
- }
- return nil
- }
- // ChangeSettings persists the user settings
- func (a *Manager) ChangeSettings(user *User) error {
- settings, err := json.Marshal(user.Prefs)
- if err != nil {
- return err
- }
- if _, err := a.db.Exec(updateUserSettingsQuery, string(settings), user.Name); err != nil {
- return err
- }
- return nil
- }
- // EnqueueStats adds the user to a queue which writes out user stats (messages, emails, ..) in
- // batches at a regular interval
- func (a *Manager) EnqueueStats(user *User) {
- a.mu.Lock()
- defer a.mu.Unlock()
- a.statsQueue[user.Name] = user
- }
- func (a *Manager) userStatsQueueWriter(interval time.Duration) {
- ticker := time.NewTicker(interval)
- for range ticker.C {
- if err := a.writeUserStatsQueue(); err != nil {
- log.Warn("UserManager: Writing user stats queue failed: %s", err.Error())
- }
- }
- }
- func (a *Manager) writeUserStatsQueue() error {
- a.mu.Lock()
- if len(a.statsQueue) == 0 {
- a.mu.Unlock()
- log.Trace("UserManager: No user stats updates to commit")
- return nil
- }
- statsQueue := a.statsQueue
- a.statsQueue = make(map[string]*User)
- a.mu.Unlock()
- tx, err := a.db.Begin()
- if err != nil {
- return err
- }
- defer tx.Rollback()
- log.Debug("UserManager: Writing user stats queue for %d user(s)", len(statsQueue))
- for username, u := range statsQueue {
- log.Trace("UserManager: Updating stats for user %s: messages=%d, emails=%d", username, u.Stats.Messages, u.Stats.Emails)
- if _, err := tx.Exec(updateUserStatsQuery, u.Stats.Messages, u.Stats.Emails, username); err != nil {
- return err
- }
- }
- return tx.Commit()
- }
- // Authorize returns nil if the given user has access to the given topic using the desired
- // permission. The user param may be nil to signal an anonymous user.
- func (a *Manager) Authorize(user *User, topic string, perm Permission) error {
- if user != nil && user.Role == RoleAdmin {
- return nil // Admin can do everything
- }
- username := Everyone
- if user != nil {
- username = user.Name
- }
- // Select the read/write permissions for this user/topic combo. The query may return two
- // rows (one for everyone, and one for the user), but prioritizes the user.
- rows, err := a.db.Query(selectTopicPermsQuery, Everyone, username, topic)
- if err != nil {
- return err
- }
- defer rows.Close()
- if !rows.Next() {
- return a.resolvePerms(a.defaultAccess, perm)
- }
- var read, write bool
- if err := rows.Scan(&read, &write); err != nil {
- return err
- } else if err := rows.Err(); err != nil {
- return err
- }
- return a.resolvePerms(NewPermission(read, write), perm)
- }
- func (a *Manager) resolvePerms(base, perm Permission) error {
- if perm == PermissionRead && base.IsRead() {
- return nil
- } else if perm == PermissionWrite && base.IsWrite() {
- return nil
- }
- return ErrUnauthorized
- }
- // AddUser adds a user with the given username, password and role
- func (a *Manager) AddUser(username, password string, role Role) error {
- if !AllowedUsername(username) || !AllowedRole(role) {
- return ErrInvalidArgument
- }
- hash, err := bcrypt.GenerateFromPassword([]byte(password), bcryptCost)
- if err != nil {
- return err
- }
- if _, err = a.db.Exec(insertUserQuery, username, hash, role); err != nil {
- return err
- }
- return nil
- }
- // RemoveUser deletes the user with the given username. The function returns nil on success, even
- // if the user did not exist in the first place.
- func (a *Manager) RemoveUser(username string) error {
- if !AllowedUsername(username) {
- return ErrInvalidArgument
- }
- // Rows in user_access, user_token, etc. are deleted via foreign keys
- if _, err := a.db.Exec(deleteUserQuery, username); err != nil {
- return err
- }
- return nil
- }
- // Users returns a list of users. It always also returns the Everyone user ("*").
- func (a *Manager) Users() ([]*User, error) {
- rows, err := a.db.Query(selectUsernamesQuery)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- usernames := make([]string, 0)
- for rows.Next() {
- var username string
- if err := rows.Scan(&username); err != nil {
- return nil, err
- } else if err := rows.Err(); err != nil {
- return nil, err
- }
- usernames = append(usernames, username)
- }
- rows.Close()
- users := make([]*User, 0)
- for _, username := range usernames {
- user, err := a.User(username)
- if err != nil {
- return nil, err
- }
- users = append(users, user)
- }
- return users, nil
- }
- // User returns the user with the given username if it exists, or ErrNotFound otherwise.
- // You may also pass Everyone to retrieve the anonymous user and its Grant list.
- func (a *Manager) User(username string) (*User, error) {
- rows, err := a.db.Query(selectUserByNameQuery, username)
- if err != nil {
- return nil, err
- }
- return a.readUser(rows)
- }
- func (a *Manager) userByToken(token string) (*User, error) {
- rows, err := a.db.Query(selectUserByTokenQuery, token, time.Now().Unix())
- if err != nil {
- return nil, err
- }
- return a.readUser(rows)
- }
- func (a *Manager) readUser(rows *sql.Rows) (*User, error) {
- defer rows.Close()
- var username, hash, role string
- var settings, planCode sql.NullString
- var messages, emails int64
- var messagesLimit, emailsLimit, topicsLimit, attachmentFileSizeLimit, attachmentTotalSizeLimit sql.NullInt64
- if !rows.Next() {
- return nil, ErrNotFound
- }
- if err := rows.Scan(&username, &hash, &role, &messages, &emails, &settings, &planCode, &messagesLimit, &emailsLimit, &topicsLimit, &attachmentFileSizeLimit, &attachmentTotalSizeLimit); err != nil {
- return nil, err
- } else if err := rows.Err(); err != nil {
- return nil, err
- }
- user := &User{
- Name: username,
- Hash: hash,
- Role: Role(role),
- Stats: &Stats{
- Messages: messages,
- Emails: emails,
- },
- }
- if settings.Valid {
- user.Prefs = &Prefs{}
- if err := json.Unmarshal([]byte(settings.String), user.Prefs); err != nil {
- return nil, err
- }
- }
- if planCode.Valid {
- user.Plan = &Plan{
- Code: planCode.String,
- Upgradeable: false,
- MessagesLimit: messagesLimit.Int64,
- EmailsLimit: emailsLimit.Int64,
- TopicsLimit: topicsLimit.Int64,
- AttachmentFileSizeLimit: attachmentFileSizeLimit.Int64,
- AttachmentTotalSizeLimit: attachmentTotalSizeLimit.Int64,
- }
- }
- return user, nil
- }
- // Grants returns all user-specific access control entries
- func (a *Manager) Grants(username string) ([]Grant, error) {
- rows, err := a.db.Query(selectUserAccessQuery, username)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- grants := make([]Grant, 0)
- for rows.Next() {
- var topic string
- var read, write bool
- if err := rows.Scan(&topic, &read, &write); err != nil {
- return nil, err
- } else if err := rows.Err(); err != nil {
- return nil, err
- }
- grants = append(grants, Grant{
- TopicPattern: fromSQLWildcard(topic),
- Allow: NewPermission(read, write),
- })
- }
- return grants, nil
- }
- // Reservations returns all user-owned topics, and the associated everyone-access
- func (a *Manager) Reservations(username string) ([]Reservation, error) {
- rows, err := a.db.Query(selectUserReservationsQuery, Everyone, username)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- reservations := make([]Reservation, 0)
- for rows.Next() {
- var topic string
- var ownerRead, ownerWrite bool
- var everyoneRead, everyoneWrite sql.NullBool
- if err := rows.Scan(&topic, &ownerRead, &ownerWrite, &everyoneRead, &everyoneWrite); err != nil {
- return nil, err
- } else if err := rows.Err(); err != nil {
- return nil, err
- }
- reservations = append(reservations, Reservation{
- Topic: topic,
- Owner: NewPermission(ownerRead, ownerWrite),
- Everyone: NewPermission(everyoneRead.Bool, everyoneWrite.Bool), // false if null
- })
- }
- return reservations, nil
- }
- // ChangePassword changes a user's password
- func (a *Manager) ChangePassword(username, password string) error {
- hash, err := bcrypt.GenerateFromPassword([]byte(password), bcryptCost)
- if err != nil {
- return err
- }
- if _, err := a.db.Exec(updateUserPassQuery, hash, username); err != nil {
- return err
- }
- return nil
- }
- // ChangeRole changes a user's role. When a role is changed from RoleUser to RoleAdmin,
- // all existing access control entries (Grant) are removed, since they are no longer needed.
- func (a *Manager) ChangeRole(username string, role Role) error {
- if !AllowedUsername(username) || !AllowedRole(role) {
- return ErrInvalidArgument
- }
- if _, err := a.db.Exec(updateUserRoleQuery, string(role), username); err != nil {
- return err
- }
- if role == RoleAdmin {
- if _, err := a.db.Exec(deleteUserAccessQuery, username); err != nil {
- return err
- }
- }
- return nil
- }
- // CheckAllowAccess tests if a user may create an access control entry for the given topic.
- // If there are any ACL entries that are not owned by the user, an error is returned.
- func (a *Manager) CheckAllowAccess(username string, topic string) error {
- if (!AllowedUsername(username) && username != Everyone) || !AllowedTopic(topic) {
- return ErrInvalidArgument
- }
- rows, err := a.db.Query(selectOtherAccessCountQuery, topic, topic, username)
- if err != nil {
- return err
- }
- defer rows.Close()
- if !rows.Next() {
- return errNoRows
- }
- var otherCount int
- if err := rows.Scan(&otherCount); err != nil {
- return err
- }
- if otherCount > 0 {
- return errTopicOwnedByOthers
- }
- return nil
- }
- // AllowAccess adds or updates an entry in th access control list for a specific user. It controls
- // read/write access to a topic. The parameter topicPattern may include wildcards (*). The ACL entry
- // owner may either be a user (username), or the system (empty).
- func (a *Manager) AllowAccess(owner, username string, topicPattern string, read bool, write bool) error {
- if !AllowedUsername(username) && username != Everyone {
- return ErrInvalidArgument
- } else if owner != "" && !AllowedUsername(owner) {
- return ErrInvalidArgument
- } else if !AllowedTopicPattern(topicPattern) {
- return ErrInvalidArgument
- }
- if _, err := a.db.Exec(upsertUserAccessQuery, username, toSQLWildcard(topicPattern), read, write, owner, owner); err != nil {
- return err
- }
- return nil
- }
- // ResetAccess removes an access control list entry for a specific username/topic, or (if topic is
- // empty) for an entire user. The parameter topicPattern may include wildcards (*).
- func (a *Manager) ResetAccess(username string, topicPattern string) error {
- if !AllowedUsername(username) && username != Everyone && username != "" {
- return ErrInvalidArgument
- } else if !AllowedTopicPattern(topicPattern) && topicPattern != "" {
- return ErrInvalidArgument
- }
- if username == "" && topicPattern == "" {
- _, err := a.db.Exec(deleteAllAccessQuery, username)
- return err
- } else if topicPattern == "" {
- _, err := a.db.Exec(deleteUserAccessQuery, username)
- return err
- }
- _, err := a.db.Exec(deleteTopicAccessQuery, username, toSQLWildcard(topicPattern))
- return err
- }
- // DefaultAccess returns the default read/write access if no access control entry matches
- func (a *Manager) DefaultAccess() Permission {
- return a.defaultAccess
- }
- func toSQLWildcard(s string) string {
- return strings.ReplaceAll(s, "*", "%")
- }
- func fromSQLWildcard(s string) string {
- return strings.ReplaceAll(s, "%", "*")
- }
- func runStartupQueries(db *sql.DB, startupQueries string) error {
- if _, err := db.Exec(startupQueries); err != nil {
- return err
- }
- if _, err := db.Exec(builtinStartupQueries); err != nil {
- return err
- }
- return nil
- }
- func setupDB(db *sql.DB) error {
- // If 'schemaVersion' table does not exist, this must be a new database
- rowsSV, err := db.Query(selectSchemaVersionQuery)
- if err != nil {
- return setupNewDB(db)
- }
- defer rowsSV.Close()
- // If 'schemaVersion' table exists, read version and potentially upgrade
- schemaVersion := 0
- if !rowsSV.Next() {
- return errors.New("cannot determine schema version: database file may be corrupt")
- }
- if err := rowsSV.Scan(&schemaVersion); err != nil {
- return err
- }
- rowsSV.Close()
- // Do migrations
- if schemaVersion == currentSchemaVersion {
- return nil
- } else if schemaVersion == 1 {
- return migrateFrom1(db)
- }
- return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
- }
- func setupNewDB(db *sql.DB) error {
- if _, err := db.Exec(createTablesQueries); err != nil {
- return err
- }
- if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
- return err
- }
- return nil
- }
- func migrateFrom1(db *sql.DB) error {
- log.Info("Migrating user database schema: from 1 to 2")
- tx, err := db.Begin()
- if err != nil {
- return err
- }
- defer tx.Rollback()
- if _, err := tx.Exec(migrate1To2RenameUserTableQueryNoTx); err != nil {
- return err
- }
- if _, err := tx.Exec(createTablesQueriesNoTx); err != nil {
- return err
- }
- if _, err := tx.Exec(migrate1To2InsertFromOldTablesAndDropNoTx); err != nil {
- return err
- }
- if _, err := tx.Exec(updateSchemaVersion, 2); err != nil {
- return err
- }
- if err := tx.Commit(); err != nil {
- return err
- }
- return nil // Update this when a new version is added
- }
|