manager.go 70 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036
  1. // Package user deals with authentication and authorization against topics
  2. package user
  3. import (
  4. "database/sql"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "github.com/mattn/go-sqlite3"
  9. "github.com/stripe/stripe-go/v74"
  10. "golang.org/x/crypto/bcrypt"
  11. "heckel.io/ntfy/v2/log"
  12. "heckel.io/ntfy/v2/util"
  13. "net/netip"
  14. "path/filepath"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. const (
  20. tierIDPrefix = "ti_"
  21. tierIDLength = 8
  22. syncTopicPrefix = "st_"
  23. syncTopicLength = 16
  24. userIDPrefix = "u_"
  25. userIDLength = 12
  26. userAuthIntentionalSlowDownHash = "$2a$10$YFCQvqQDwIIwnJM1xkAYOeih0dg17UVGanaTStnrSzC8NCWxcLDwy" // Cost should match DefaultUserPasswordBcryptCost
  27. userHardDeleteAfterDuration = 7 * 24 * time.Hour
  28. tokenPrefix = "tk_"
  29. tokenLength = 32
  30. tokenMaxCount = 60 // Only keep this many tokens in the table per user
  31. tag = "user_manager"
  32. )
  33. // Default constants that may be overridden by configs
  34. const (
  35. DefaultUserStatsQueueWriterInterval = 33 * time.Second
  36. DefaultUserPasswordBcryptCost = 10
  37. )
  38. var (
  39. errNoTokenProvided = errors.New("no token provided")
  40. errTopicOwnedByOthers = errors.New("topic owned by others")
  41. errNoRows = errors.New("no rows found")
  42. )
  43. // Manager-related queries
  44. const (
  45. createTablesQueries = `
  46. BEGIN;
  47. CREATE TABLE IF NOT EXISTS tier (
  48. id TEXT PRIMARY KEY,
  49. code TEXT NOT NULL,
  50. name TEXT NOT NULL,
  51. messages_limit INT NOT NULL,
  52. messages_expiry_duration INT NOT NULL,
  53. emails_limit INT NOT NULL,
  54. calls_limit INT NOT NULL,
  55. reservations_limit INT NOT NULL,
  56. attachment_file_size_limit INT NOT NULL,
  57. attachment_total_size_limit INT NOT NULL,
  58. attachment_expiry_duration INT NOT NULL,
  59. attachment_bandwidth_limit INT NOT NULL,
  60. stripe_monthly_price_id TEXT,
  61. stripe_yearly_price_id TEXT
  62. );
  63. CREATE UNIQUE INDEX idx_tier_code ON tier (code);
  64. CREATE UNIQUE INDEX idx_tier_stripe_monthly_price_id ON tier (stripe_monthly_price_id);
  65. CREATE UNIQUE INDEX idx_tier_stripe_yearly_price_id ON tier (stripe_yearly_price_id);
  66. CREATE TABLE IF NOT EXISTS user (
  67. id TEXT PRIMARY KEY,
  68. tier_id TEXT,
  69. user TEXT NOT NULL,
  70. pass TEXT NOT NULL,
  71. role TEXT CHECK (role IN ('anonymous', 'admin', 'user')) NOT NULL,
  72. prefs JSON NOT NULL DEFAULT '{}',
  73. sync_topic TEXT NOT NULL,
  74. provisioned INT NOT NULL,
  75. stats_messages INT NOT NULL DEFAULT (0),
  76. stats_emails INT NOT NULL DEFAULT (0),
  77. stats_calls INT NOT NULL DEFAULT (0),
  78. stripe_customer_id TEXT,
  79. stripe_subscription_id TEXT,
  80. stripe_subscription_status TEXT,
  81. stripe_subscription_interval TEXT,
  82. stripe_subscription_paid_until INT,
  83. stripe_subscription_cancel_at INT,
  84. created INT NOT NULL,
  85. deleted INT,
  86. FOREIGN KEY (tier_id) REFERENCES tier (id)
  87. );
  88. CREATE UNIQUE INDEX idx_user ON user (user);
  89. CREATE UNIQUE INDEX idx_user_stripe_customer_id ON user (stripe_customer_id);
  90. CREATE UNIQUE INDEX idx_user_stripe_subscription_id ON user (stripe_subscription_id);
  91. CREATE TABLE IF NOT EXISTS user_access (
  92. user_id TEXT NOT NULL,
  93. topic TEXT NOT NULL,
  94. read INT NOT NULL,
  95. write INT NOT NULL,
  96. owner_user_id INT,
  97. provisioned INT NOT NULL,
  98. PRIMARY KEY (user_id, topic),
  99. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE,
  100. FOREIGN KEY (owner_user_id) REFERENCES user (id) ON DELETE CASCADE
  101. );
  102. CREATE TABLE IF NOT EXISTS user_token (
  103. user_id TEXT NOT NULL,
  104. token TEXT NOT NULL,
  105. label TEXT NOT NULL,
  106. last_access INT NOT NULL,
  107. last_origin TEXT NOT NULL,
  108. expires INT NOT NULL,
  109. provisioned INT NOT NULL,
  110. PRIMARY KEY (user_id, token),
  111. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  112. );
  113. CREATE UNIQUE INDEX idx_user_token ON user_token (token);
  114. CREATE TABLE IF NOT EXISTS user_phone (
  115. user_id TEXT NOT NULL,
  116. phone_number TEXT NOT NULL,
  117. PRIMARY KEY (user_id, phone_number),
  118. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  119. );
  120. CREATE TABLE IF NOT EXISTS schemaVersion (
  121. id INT PRIMARY KEY,
  122. version INT NOT NULL
  123. );
  124. INSERT INTO user (id, user, pass, role, sync_topic, provisioned, created)
  125. VALUES ('` + everyoneID + `', '*', '', 'anonymous', '', false, UNIXEPOCH())
  126. ON CONFLICT (id) DO NOTHING;
  127. COMMIT;
  128. `
  129. builtinStartupQueries = `
  130. PRAGMA foreign_keys = ON;
  131. `
  132. selectUserByIDQuery = `
  133. SELECT u.id, u.user, u.pass, u.role, u.prefs, u.sync_topic, u.provisioned, u.stats_messages, u.stats_emails, u.stats_calls, u.stripe_customer_id, u.stripe_subscription_id, u.stripe_subscription_status, u.stripe_subscription_interval, u.stripe_subscription_paid_until, u.stripe_subscription_cancel_at, deleted, t.id, t.code, t.name, t.messages_limit, t.messages_expiry_duration, t.emails_limit, t.calls_limit, t.reservations_limit, t.attachment_file_size_limit, t.attachment_total_size_limit, t.attachment_expiry_duration, t.attachment_bandwidth_limit, t.stripe_monthly_price_id, t.stripe_yearly_price_id
  134. FROM user u
  135. LEFT JOIN tier t on t.id = u.tier_id
  136. WHERE u.id = ?
  137. `
  138. selectUserByNameQuery = `
  139. SELECT u.id, u.user, u.pass, u.role, u.prefs, u.sync_topic, u.provisioned, u.stats_messages, u.stats_emails, u.stats_calls, u.stripe_customer_id, u.stripe_subscription_id, u.stripe_subscription_status, u.stripe_subscription_interval, u.stripe_subscription_paid_until, u.stripe_subscription_cancel_at, deleted, t.id, t.code, t.name, t.messages_limit, t.messages_expiry_duration, t.emails_limit, t.calls_limit, t.reservations_limit, t.attachment_file_size_limit, t.attachment_total_size_limit, t.attachment_expiry_duration, t.attachment_bandwidth_limit, t.stripe_monthly_price_id, t.stripe_yearly_price_id
  140. FROM user u
  141. LEFT JOIN tier t on t.id = u.tier_id
  142. WHERE user = ?
  143. `
  144. selectUserByTokenQuery = `
  145. SELECT u.id, u.user, u.pass, u.role, u.prefs, u.sync_topic, u.provisioned, u.stats_messages, u.stats_emails, u.stats_calls, u.stripe_customer_id, u.stripe_subscription_id, u.stripe_subscription_status, u.stripe_subscription_interval, u.stripe_subscription_paid_until, u.stripe_subscription_cancel_at, deleted, t.id, t.code, t.name, t.messages_limit, t.messages_expiry_duration, t.emails_limit, t.calls_limit, t.reservations_limit, t.attachment_file_size_limit, t.attachment_total_size_limit, t.attachment_expiry_duration, t.attachment_bandwidth_limit, t.stripe_monthly_price_id, t.stripe_yearly_price_id
  146. FROM user u
  147. JOIN user_token tk on u.id = tk.user_id
  148. LEFT JOIN tier t on t.id = u.tier_id
  149. WHERE tk.token = ? AND (tk.expires = 0 OR tk.expires >= ?)
  150. `
  151. selectUserByStripeCustomerIDQuery = `
  152. SELECT u.id, u.user, u.pass, u.role, u.prefs, u.sync_topic, u.provisioned, u.stats_messages, u.stats_emails, u.stats_calls, u.stripe_customer_id, u.stripe_subscription_id, u.stripe_subscription_status, u.stripe_subscription_interval, u.stripe_subscription_paid_until, u.stripe_subscription_cancel_at, deleted, t.id, t.code, t.name, t.messages_limit, t.messages_expiry_duration, t.emails_limit, t.calls_limit, t.reservations_limit, t.attachment_file_size_limit, t.attachment_total_size_limit, t.attachment_expiry_duration, t.attachment_bandwidth_limit, t.stripe_monthly_price_id, t.stripe_yearly_price_id
  153. FROM user u
  154. LEFT JOIN tier t on t.id = u.tier_id
  155. WHERE u.stripe_customer_id = ?
  156. `
  157. selectTopicPermsQuery = `
  158. SELECT read, write
  159. FROM user_access a
  160. JOIN user u ON u.id = a.user_id
  161. WHERE (u.user = ? OR u.user = ?) AND ? LIKE a.topic ESCAPE '\'
  162. ORDER BY u.user DESC, LENGTH(a.topic) DESC, a.write DESC
  163. `
  164. insertUserQuery = `
  165. INSERT INTO user (id, user, pass, role, sync_topic, provisioned, created)
  166. VALUES (?, ?, ?, ?, ?, ?, ?)
  167. `
  168. selectUsernamesQuery = `
  169. SELECT user
  170. FROM user
  171. ORDER BY
  172. CASE role
  173. WHEN 'admin' THEN 1
  174. WHEN 'anonymous' THEN 3
  175. ELSE 2
  176. END, user
  177. `
  178. selectUserCountQuery = `SELECT COUNT(*) FROM user`
  179. selectUserIDFromUsernameQuery = `SELECT id FROM user WHERE user = ?`
  180. updateUserPassQuery = `UPDATE user SET pass = ? WHERE user = ?`
  181. updateUserRoleQuery = `UPDATE user SET role = ? WHERE user = ?`
  182. updateUserProvisionedQuery = `UPDATE user SET provisioned = ? WHERE user = ?`
  183. updateUserPrefsQuery = `UPDATE user SET prefs = ? WHERE id = ?`
  184. updateUserStatsQuery = `UPDATE user SET stats_messages = ?, stats_emails = ?, stats_calls = ? WHERE id = ?`
  185. updateUserStatsResetAllQuery = `UPDATE user SET stats_messages = 0, stats_emails = 0, stats_calls = 0`
  186. updateUserDeletedQuery = `UPDATE user SET deleted = ? WHERE id = ?`
  187. deleteUsersMarkedQuery = `DELETE FROM user WHERE deleted < ?`
  188. deleteUserQuery = `DELETE FROM user WHERE user = ?`
  189. upsertUserAccessQuery = `
  190. INSERT INTO user_access (user_id, topic, read, write, owner_user_id, provisioned)
  191. VALUES ((SELECT id FROM user WHERE user = ?), ?, ?, ?, (SELECT IIF(?='',NULL,(SELECT id FROM user WHERE user=?))), ?)
  192. ON CONFLICT (user_id, topic)
  193. DO UPDATE SET read=excluded.read, write=excluded.write, owner_user_id=excluded.owner_user_id, provisioned=excluded.provisioned
  194. `
  195. selectUserAllAccessQuery = `
  196. SELECT user_id, topic, read, write, provisioned
  197. FROM user_access
  198. ORDER BY LENGTH(topic) DESC, write DESC, read DESC, topic
  199. `
  200. selectUserAccessQuery = `
  201. SELECT topic, read, write, provisioned
  202. FROM user_access
  203. WHERE user_id = (SELECT id FROM user WHERE user = ?)
  204. ORDER BY LENGTH(topic) DESC, write DESC, read DESC, topic
  205. `
  206. selectUserReservationsQuery = `
  207. SELECT a_user.topic, a_user.read, a_user.write, a_everyone.read AS everyone_read, a_everyone.write AS everyone_write
  208. FROM user_access a_user
  209. LEFT JOIN user_access a_everyone ON a_user.topic = a_everyone.topic AND a_everyone.user_id = (SELECT id FROM user WHERE user = ?)
  210. WHERE a_user.user_id = a_user.owner_user_id
  211. AND a_user.owner_user_id = (SELECT id FROM user WHERE user = ?)
  212. ORDER BY a_user.topic
  213. `
  214. selectUserReservationsCountQuery = `
  215. SELECT COUNT(*)
  216. FROM user_access
  217. WHERE user_id = owner_user_id
  218. AND owner_user_id = (SELECT id FROM user WHERE user = ?)
  219. `
  220. selectUserReservationsOwnerQuery = `
  221. SELECT owner_user_id
  222. FROM user_access
  223. WHERE topic = ?
  224. AND user_id = owner_user_id
  225. `
  226. selectUserHasReservationQuery = `
  227. SELECT COUNT(*)
  228. FROM user_access
  229. WHERE user_id = owner_user_id
  230. AND owner_user_id = (SELECT id FROM user WHERE user = ?)
  231. AND topic = ?
  232. `
  233. selectOtherAccessCountQuery = `
  234. SELECT COUNT(*)
  235. FROM user_access
  236. WHERE (topic = ? OR ? LIKE topic ESCAPE '\')
  237. AND (owner_user_id IS NULL OR owner_user_id != (SELECT id FROM user WHERE user = ?))
  238. `
  239. deleteAllAccessQuery = `DELETE FROM user_access`
  240. deleteUserAccessQuery = `
  241. DELETE FROM user_access
  242. WHERE user_id = (SELECT id FROM user WHERE user = ?)
  243. OR owner_user_id = (SELECT id FROM user WHERE user = ?)
  244. `
  245. deleteUserAccessProvisionedQuery = `DELETE FROM user_access WHERE provisioned = 1`
  246. deleteTopicAccessQuery = `
  247. DELETE FROM user_access
  248. WHERE (user_id = (SELECT id FROM user WHERE user = ?) OR owner_user_id = (SELECT id FROM user WHERE user = ?))
  249. AND topic = ?
  250. `
  251. selectTokenCountQuery = `SELECT COUNT(*) FROM user_token WHERE user_id = ?`
  252. selectTokensQuery = `SELECT token, label, last_access, last_origin, expires, provisioned FROM user_token WHERE user_id = ?`
  253. selectTokenQuery = `SELECT token, label, last_access, last_origin, expires, provisioned FROM user_token WHERE user_id = ? AND token = ?`
  254. upsertTokenQuery = `
  255. INSERT INTO user_token (user_id, token, label, last_access, last_origin, expires, provisioned)
  256. VALUES (?, ?, ?, ?, ?, ?, ?)
  257. ON CONFLICT (user_id, token)
  258. DO UPDATE SET label = excluded.label, expires = excluded.expires, provisioned = excluded.provisioned;
  259. `
  260. updateTokenExpiryQuery = `UPDATE user_token SET expires = ? WHERE user_id = ? AND token = ?`
  261. updateTokenLabelQuery = `UPDATE user_token SET label = ? WHERE user_id = ? AND token = ?`
  262. updateTokenLastAccessQuery = `UPDATE user_token SET last_access = ?, last_origin = ? WHERE token = ?`
  263. deleteTokenQuery = `DELETE FROM user_token WHERE user_id = ? AND token = ?`
  264. deleteAllTokenQuery = `DELETE FROM user_token WHERE user_id = ?`
  265. deleteTokensProvisionedQuery = `DELETE FROM user_token WHERE provisioned = 1`
  266. deleteExpiredTokensQuery = `DELETE FROM user_token WHERE expires > 0 AND expires < ?`
  267. deleteExcessTokensQuery = `
  268. DELETE FROM user_token
  269. WHERE user_id = ?
  270. AND (user_id, token) NOT IN (
  271. SELECT user_id, token
  272. FROM user_token
  273. WHERE user_id = ?
  274. ORDER BY expires DESC
  275. LIMIT ?
  276. )
  277. `
  278. selectPhoneNumbersQuery = `SELECT phone_number FROM user_phone WHERE user_id = ?`
  279. insertPhoneNumberQuery = `INSERT INTO user_phone (user_id, phone_number) VALUES (?, ?)`
  280. deletePhoneNumberQuery = `DELETE FROM user_phone WHERE user_id = ? AND phone_number = ?`
  281. insertTierQuery = `
  282. INSERT INTO tier (id, code, name, messages_limit, messages_expiry_duration, emails_limit, calls_limit, reservations_limit, attachment_file_size_limit, attachment_total_size_limit, attachment_expiry_duration, attachment_bandwidth_limit, stripe_monthly_price_id, stripe_yearly_price_id)
  283. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  284. `
  285. updateTierQuery = `
  286. UPDATE tier
  287. SET name = ?, messages_limit = ?, messages_expiry_duration = ?, emails_limit = ?, calls_limit = ?, reservations_limit = ?, attachment_file_size_limit = ?, attachment_total_size_limit = ?, attachment_expiry_duration = ?, attachment_bandwidth_limit = ?, stripe_monthly_price_id = ?, stripe_yearly_price_id = ?
  288. WHERE code = ?
  289. `
  290. selectTiersQuery = `
  291. SELECT id, code, name, messages_limit, messages_expiry_duration, emails_limit, calls_limit, reservations_limit, attachment_file_size_limit, attachment_total_size_limit, attachment_expiry_duration, attachment_bandwidth_limit, stripe_monthly_price_id, stripe_yearly_price_id
  292. FROM tier
  293. `
  294. selectTierByCodeQuery = `
  295. SELECT id, code, name, messages_limit, messages_expiry_duration, emails_limit, calls_limit, reservations_limit, attachment_file_size_limit, attachment_total_size_limit, attachment_expiry_duration, attachment_bandwidth_limit, stripe_monthly_price_id, stripe_yearly_price_id
  296. FROM tier
  297. WHERE code = ?
  298. `
  299. selectTierByPriceIDQuery = `
  300. SELECT id, code, name, messages_limit, messages_expiry_duration, emails_limit, calls_limit, reservations_limit, attachment_file_size_limit, attachment_total_size_limit, attachment_expiry_duration, attachment_bandwidth_limit, stripe_monthly_price_id, stripe_yearly_price_id
  301. FROM tier
  302. WHERE (stripe_monthly_price_id = ? OR stripe_yearly_price_id = ?)
  303. `
  304. updateUserTierQuery = `UPDATE user SET tier_id = (SELECT id FROM tier WHERE code = ?) WHERE user = ?`
  305. deleteUserTierQuery = `UPDATE user SET tier_id = null WHERE user = ?`
  306. deleteTierQuery = `DELETE FROM tier WHERE code = ?`
  307. updateBillingQuery = `
  308. UPDATE user
  309. SET stripe_customer_id = ?, stripe_subscription_id = ?, stripe_subscription_status = ?, stripe_subscription_interval = ?, stripe_subscription_paid_until = ?, stripe_subscription_cancel_at = ?
  310. WHERE user = ?
  311. `
  312. )
  313. // Schema management queries
  314. const (
  315. currentSchemaVersion = 6
  316. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  317. updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
  318. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  319. // 1 -> 2 (complex migration!)
  320. migrate1To2CreateTablesQueries = `
  321. ALTER TABLE user RENAME TO user_old;
  322. CREATE TABLE IF NOT EXISTS tier (
  323. id TEXT PRIMARY KEY,
  324. code TEXT NOT NULL,
  325. name TEXT NOT NULL,
  326. messages_limit INT NOT NULL,
  327. messages_expiry_duration INT NOT NULL,
  328. emails_limit INT NOT NULL,
  329. reservations_limit INT NOT NULL,
  330. attachment_file_size_limit INT NOT NULL,
  331. attachment_total_size_limit INT NOT NULL,
  332. attachment_expiry_duration INT NOT NULL,
  333. attachment_bandwidth_limit INT NOT NULL,
  334. stripe_price_id TEXT
  335. );
  336. CREATE UNIQUE INDEX idx_tier_code ON tier (code);
  337. CREATE UNIQUE INDEX idx_tier_price_id ON tier (stripe_price_id);
  338. CREATE TABLE IF NOT EXISTS user (
  339. id TEXT PRIMARY KEY,
  340. tier_id TEXT,
  341. user TEXT NOT NULL,
  342. pass TEXT NOT NULL,
  343. role TEXT CHECK (role IN ('anonymous', 'admin', 'user')) NOT NULL,
  344. prefs JSON NOT NULL DEFAULT '{}',
  345. sync_topic TEXT NOT NULL,
  346. stats_messages INT NOT NULL DEFAULT (0),
  347. stats_emails INT NOT NULL DEFAULT (0),
  348. stripe_customer_id TEXT,
  349. stripe_subscription_id TEXT,
  350. stripe_subscription_status TEXT,
  351. stripe_subscription_paid_until INT,
  352. stripe_subscription_cancel_at INT,
  353. created INT NOT NULL,
  354. deleted INT,
  355. FOREIGN KEY (tier_id) REFERENCES tier (id)
  356. );
  357. CREATE UNIQUE INDEX idx_user ON user (user);
  358. CREATE UNIQUE INDEX idx_user_stripe_customer_id ON user (stripe_customer_id);
  359. CREATE UNIQUE INDEX idx_user_stripe_subscription_id ON user (stripe_subscription_id);
  360. CREATE TABLE IF NOT EXISTS user_access (
  361. user_id TEXT NOT NULL,
  362. topic TEXT NOT NULL,
  363. read INT NOT NULL,
  364. write INT NOT NULL,
  365. owner_user_id INT,
  366. PRIMARY KEY (user_id, topic),
  367. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE,
  368. FOREIGN KEY (owner_user_id) REFERENCES user (id) ON DELETE CASCADE
  369. );
  370. CREATE TABLE IF NOT EXISTS user_token (
  371. user_id TEXT NOT NULL,
  372. token TEXT NOT NULL,
  373. label TEXT NOT NULL,
  374. last_access INT NOT NULL,
  375. last_origin TEXT NOT NULL,
  376. expires INT NOT NULL,
  377. PRIMARY KEY (user_id, token),
  378. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  379. );
  380. CREATE TABLE IF NOT EXISTS schemaVersion (
  381. id INT PRIMARY KEY,
  382. version INT NOT NULL
  383. );
  384. INSERT INTO user (id, user, pass, role, sync_topic, created)
  385. VALUES ('u_everyone', '*', '', 'anonymous', '', UNIXEPOCH())
  386. ON CONFLICT (id) DO NOTHING;
  387. `
  388. migrate1To2SelectAllOldUsernamesNoTx = `SELECT user FROM user_old`
  389. migrate1To2InsertUserNoTx = `
  390. INSERT INTO user (id, user, pass, role, sync_topic, created)
  391. SELECT ?, user, pass, role, ?, UNIXEPOCH() FROM user_old WHERE user = ?
  392. `
  393. migrate1To2InsertFromOldTablesAndDropNoTx = `
  394. INSERT INTO user_access (user_id, topic, read, write)
  395. SELECT u.id, a.topic, a.read, a.write
  396. FROM user u
  397. JOIN access a ON u.user = a.user;
  398. DROP TABLE access;
  399. DROP TABLE user_old;
  400. `
  401. // 2 -> 3
  402. migrate2To3UpdateQueries = `
  403. ALTER TABLE user ADD COLUMN stripe_subscription_interval TEXT;
  404. ALTER TABLE tier RENAME COLUMN stripe_price_id TO stripe_monthly_price_id;
  405. ALTER TABLE tier ADD COLUMN stripe_yearly_price_id TEXT;
  406. DROP INDEX IF EXISTS idx_tier_price_id;
  407. CREATE UNIQUE INDEX idx_tier_stripe_monthly_price_id ON tier (stripe_monthly_price_id);
  408. CREATE UNIQUE INDEX idx_tier_stripe_yearly_price_id ON tier (stripe_yearly_price_id);
  409. `
  410. // 3 -> 4
  411. migrate3To4UpdateQueries = `
  412. ALTER TABLE tier ADD COLUMN calls_limit INT NOT NULL DEFAULT (0);
  413. ALTER TABLE user ADD COLUMN stats_calls INT NOT NULL DEFAULT (0);
  414. CREATE TABLE IF NOT EXISTS user_phone (
  415. user_id TEXT NOT NULL,
  416. phone_number TEXT NOT NULL,
  417. PRIMARY KEY (user_id, phone_number),
  418. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  419. );
  420. `
  421. // 4 -> 5
  422. migrate4To5UpdateQueries = `
  423. UPDATE user_access SET topic = REPLACE(topic, '_', '\_');
  424. `
  425. // 5 -> 6
  426. migrate5To6UpdateQueries = `
  427. PRAGMA foreign_keys=off;
  428. -- Alter user table: Add provisioned column
  429. ALTER TABLE user RENAME TO user_old;
  430. CREATE TABLE IF NOT EXISTS user (
  431. id TEXT PRIMARY KEY,
  432. tier_id TEXT,
  433. user TEXT NOT NULL,
  434. pass TEXT NOT NULL,
  435. role TEXT CHECK (role IN ('anonymous', 'admin', 'user')) NOT NULL,
  436. prefs JSON NOT NULL DEFAULT '{}',
  437. sync_topic TEXT NOT NULL,
  438. provisioned INT NOT NULL,
  439. stats_messages INT NOT NULL DEFAULT (0),
  440. stats_emails INT NOT NULL DEFAULT (0),
  441. stats_calls INT NOT NULL DEFAULT (0),
  442. stripe_customer_id TEXT,
  443. stripe_subscription_id TEXT,
  444. stripe_subscription_status TEXT,
  445. stripe_subscription_interval TEXT,
  446. stripe_subscription_paid_until INT,
  447. stripe_subscription_cancel_at INT,
  448. created INT NOT NULL,
  449. deleted INT,
  450. FOREIGN KEY (tier_id) REFERENCES tier (id)
  451. );
  452. INSERT INTO user
  453. SELECT
  454. id,
  455. tier_id,
  456. user,
  457. pass,
  458. role,
  459. prefs,
  460. sync_topic,
  461. 0, -- provisioned
  462. stats_messages,
  463. stats_emails,
  464. stats_calls,
  465. stripe_customer_id,
  466. stripe_subscription_id,
  467. stripe_subscription_status,
  468. stripe_subscription_interval,
  469. stripe_subscription_paid_until,
  470. stripe_subscription_cancel_at,
  471. created,
  472. deleted
  473. FROM user_old;
  474. DROP TABLE user_old;
  475. -- Alter user_access table: Add provisioned column
  476. ALTER TABLE user_access RENAME TO user_access_old;
  477. CREATE TABLE user_access (
  478. user_id TEXT NOT NULL,
  479. topic TEXT NOT NULL,
  480. read INT NOT NULL,
  481. write INT NOT NULL,
  482. owner_user_id INT,
  483. provisioned INTEGER NOT NULL,
  484. PRIMARY KEY (user_id, topic),
  485. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE,
  486. FOREIGN KEY (owner_user_id) REFERENCES user (id) ON DELETE CASCADE
  487. );
  488. INSERT INTO user_access SELECT *, 0 FROM user_access_old;
  489. DROP TABLE user_access_old;
  490. -- Alter user_token table: Add provisioned column
  491. ALTER TABLE user_token RENAME TO user_token_old;
  492. CREATE TABLE IF NOT EXISTS user_token (
  493. user_id TEXT NOT NULL,
  494. token TEXT NOT NULL,
  495. label TEXT NOT NULL,
  496. last_access INT NOT NULL,
  497. last_origin TEXT NOT NULL,
  498. expires INT NOT NULL,
  499. provisioned INT NOT NULL,
  500. PRIMARY KEY (user_id, token),
  501. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  502. );
  503. INSERT INTO user_token SELECT *, 0 FROM user_token_old;
  504. DROP TABLE user_token_old;
  505. -- Recreate indices
  506. CREATE UNIQUE INDEX idx_user ON user (user);
  507. CREATE UNIQUE INDEX idx_user_stripe_customer_id ON user (stripe_customer_id);
  508. CREATE UNIQUE INDEX idx_user_stripe_subscription_id ON user (stripe_subscription_id);
  509. CREATE UNIQUE INDEX idx_user_token ON user_token (token);
  510. -- Re-enable foreign keys
  511. PRAGMA foreign_keys=on;
  512. `
  513. )
  514. var (
  515. migrations = map[int]func(db *sql.DB) error{
  516. 1: migrateFrom1,
  517. 2: migrateFrom2,
  518. 3: migrateFrom3,
  519. 4: migrateFrom4,
  520. 5: migrateFrom5,
  521. }
  522. )
  523. // Manager is an implementation of Manager. It stores users and access control list
  524. // in a SQLite database.
  525. type Manager struct {
  526. config *Config
  527. db *sql.DB
  528. statsQueue map[string]*Stats // "Queue" to asynchronously write user stats to the database (UserID -> Stats)
  529. tokenQueue map[string]*TokenUpdate // "Queue" to asynchronously write token access stats to the database (Token ID -> TokenUpdate)
  530. mu sync.Mutex
  531. }
  532. // Config holds the configuration for the user Manager
  533. type Config struct {
  534. Filename string // Database filename, e.g. "/var/lib/ntfy/user.db"
  535. StartupQueries string // Queries to run on startup, e.g. to create initial users or tiers
  536. DefaultAccess Permission // Default permission if no ACL matches
  537. ProvisionEnabled bool // Enable auto-provisioning of users and access grants, disabled for "ntfy user" commands
  538. Users []*User // Predefined users to create on startup
  539. Access map[string][]*Grant // Predefined access grants to create on startup (username -> []*Grant)
  540. Tokens map[string][]*Token // Predefined users to create on startup (username -> []*Token)
  541. QueueWriterInterval time.Duration // Interval for the async queue writer to flush stats and token updates to the database
  542. BcryptCost int // Cost of generated passwords; lowering makes testing faster
  543. }
  544. var _ Auther = (*Manager)(nil)
  545. // NewManager creates a new Manager instance
  546. func NewManager(config *Config) (*Manager, error) {
  547. // Set defaults
  548. if config.BcryptCost <= 0 {
  549. config.BcryptCost = DefaultUserPasswordBcryptCost
  550. }
  551. if config.QueueWriterInterval.Seconds() <= 0 {
  552. config.QueueWriterInterval = DefaultUserStatsQueueWriterInterval
  553. }
  554. // Check the parent directory of the database file (makes for friendly error messages)
  555. parentDir := filepath.Dir(config.Filename)
  556. if !util.FileExists(parentDir) {
  557. return nil, fmt.Errorf("user database directory %s does not exist or is not accessible", parentDir)
  558. }
  559. // Open DB and run setup queries
  560. db, err := sql.Open("sqlite3", config.Filename)
  561. if err != nil {
  562. return nil, err
  563. }
  564. if err := setupDB(db); err != nil {
  565. return nil, err
  566. }
  567. if err := runStartupQueries(db, config.StartupQueries); err != nil {
  568. return nil, err
  569. }
  570. manager := &Manager{
  571. db: db,
  572. config: config,
  573. statsQueue: make(map[string]*Stats),
  574. tokenQueue: make(map[string]*TokenUpdate),
  575. }
  576. if err := manager.maybeProvisionUsersAndAccess(); err != nil {
  577. return nil, err
  578. }
  579. go manager.asyncQueueWriter(config.QueueWriterInterval)
  580. return manager, nil
  581. }
  582. // Authenticate checks username and password and returns a User if correct, and the user has not been
  583. // marked as deleted. The method returns in constant-ish time, regardless of whether the user exists or
  584. // the password is correct or incorrect.
  585. func (a *Manager) Authenticate(username, password string) (*User, error) {
  586. if username == Everyone {
  587. return nil, ErrUnauthenticated
  588. }
  589. user, err := a.User(username)
  590. if err != nil {
  591. log.Tag(tag).Field("user_name", username).Err(err).Trace("Authentication of user failed (1)")
  592. bcrypt.CompareHashAndPassword([]byte(userAuthIntentionalSlowDownHash), []byte("intentional slow-down to avoid timing attacks"))
  593. return nil, ErrUnauthenticated
  594. } else if user.Deleted {
  595. log.Tag(tag).Field("user_name", username).Trace("Authentication of user failed (2): user marked deleted")
  596. bcrypt.CompareHashAndPassword([]byte(userAuthIntentionalSlowDownHash), []byte("intentional slow-down to avoid timing attacks"))
  597. return nil, ErrUnauthenticated
  598. } else if err := bcrypt.CompareHashAndPassword([]byte(user.Hash), []byte(password)); err != nil {
  599. log.Tag(tag).Field("user_name", username).Err(err).Trace("Authentication of user failed (3)")
  600. return nil, ErrUnauthenticated
  601. }
  602. return user, nil
  603. }
  604. // AuthenticateToken checks if the token exists and returns the associated User if it does.
  605. // The method sets the User.Token value to the token that was used for authentication.
  606. func (a *Manager) AuthenticateToken(token string) (*User, error) {
  607. if len(token) != tokenLength {
  608. return nil, ErrUnauthenticated
  609. }
  610. user, err := a.userByToken(token)
  611. if err != nil {
  612. log.Tag(tag).Field("token", token).Err(err).Trace("Authentication of token failed")
  613. return nil, ErrUnauthenticated
  614. }
  615. user.Token = token
  616. return user, nil
  617. }
  618. // CreateToken generates a random token for the given user and returns it. The token expires
  619. // after a fixed duration unless ChangeToken is called. This function also prunes tokens for the
  620. // given user, if there are too many of them.
  621. func (a *Manager) CreateToken(userID, label string, expires time.Time, origin netip.Addr, provisioned bool) (*Token, error) {
  622. return queryTx(a.db, func(tx *sql.Tx) (*Token, error) {
  623. return a.createTokenTx(tx, userID, GenerateToken(), label, expires, origin, provisioned)
  624. })
  625. }
  626. func (a *Manager) createTokenTx(tx *sql.Tx, userID, token, label string, expires time.Time, origin netip.Addr, provisioned bool) (*Token, error) {
  627. access := time.Now()
  628. if _, err := tx.Exec(upsertTokenQuery, userID, token, label, access.Unix(), origin.String(), expires.Unix(), provisioned); err != nil {
  629. return nil, err
  630. }
  631. rows, err := tx.Query(selectTokenCountQuery, userID)
  632. if err != nil {
  633. return nil, err
  634. }
  635. defer rows.Close()
  636. if !rows.Next() {
  637. return nil, errNoRows
  638. }
  639. var tokenCount int
  640. if err := rows.Scan(&tokenCount); err != nil {
  641. return nil, err
  642. }
  643. if tokenCount >= tokenMaxCount {
  644. // This pruning logic is done in two queries for efficiency. The SELECT above is a lookup
  645. // on two indices, whereas the query below is a full table scan.
  646. if _, err := tx.Exec(deleteExcessTokensQuery, userID, userID, tokenMaxCount); err != nil {
  647. return nil, err
  648. }
  649. }
  650. return &Token{
  651. Value: token,
  652. Label: label,
  653. LastAccess: access,
  654. LastOrigin: origin,
  655. Expires: expires,
  656. Provisioned: provisioned,
  657. }, nil
  658. }
  659. // Tokens returns all existing tokens for the user with the given user ID
  660. func (a *Manager) Tokens(userID string) ([]*Token, error) {
  661. rows, err := a.db.Query(selectTokensQuery, userID)
  662. if err != nil {
  663. return nil, err
  664. }
  665. defer rows.Close()
  666. tokens := make([]*Token, 0)
  667. for {
  668. token, err := a.readToken(rows)
  669. if errors.Is(err, ErrTokenNotFound) {
  670. break
  671. } else if err != nil {
  672. return nil, err
  673. }
  674. tokens = append(tokens, token)
  675. }
  676. return tokens, nil
  677. }
  678. // Token returns a specific token for a user
  679. func (a *Manager) Token(userID, token string) (*Token, error) {
  680. rows, err := a.db.Query(selectTokenQuery, userID, token)
  681. if err != nil {
  682. return nil, err
  683. }
  684. defer rows.Close()
  685. return a.readToken(rows)
  686. }
  687. func (a *Manager) readToken(rows *sql.Rows) (*Token, error) {
  688. var token, label, lastOrigin string
  689. var lastAccess, expires int64
  690. var provisioned bool
  691. if !rows.Next() {
  692. return nil, ErrTokenNotFound
  693. }
  694. if err := rows.Scan(&token, &label, &lastAccess, &lastOrigin, &expires, &provisioned); err != nil {
  695. return nil, err
  696. } else if err := rows.Err(); err != nil {
  697. return nil, err
  698. }
  699. lastOriginIP, err := netip.ParseAddr(lastOrigin)
  700. if err != nil {
  701. lastOriginIP = netip.IPv4Unspecified()
  702. }
  703. return &Token{
  704. Value: token,
  705. Label: label,
  706. LastAccess: time.Unix(lastAccess, 0),
  707. LastOrigin: lastOriginIP,
  708. Expires: time.Unix(expires, 0),
  709. Provisioned: provisioned,
  710. }, nil
  711. }
  712. // ChangeToken updates a token's label and/or expiry date
  713. func (a *Manager) ChangeToken(userID, token string, label *string, expires *time.Time) (*Token, error) {
  714. if token == "" {
  715. return nil, errNoTokenProvided
  716. }
  717. tx, err := a.db.Begin()
  718. if err != nil {
  719. return nil, err
  720. }
  721. defer tx.Rollback()
  722. if label != nil {
  723. if _, err := tx.Exec(updateTokenLabelQuery, *label, userID, token); err != nil {
  724. return nil, err
  725. }
  726. }
  727. if expires != nil {
  728. if _, err := tx.Exec(updateTokenExpiryQuery, expires.Unix(), userID, token); err != nil {
  729. return nil, err
  730. }
  731. }
  732. if err := tx.Commit(); err != nil {
  733. return nil, err
  734. }
  735. return a.Token(userID, token)
  736. }
  737. // RemoveToken deletes the token defined in User.Token
  738. func (a *Manager) RemoveToken(userID, token string) error {
  739. if token == "" {
  740. return errNoTokenProvided
  741. }
  742. if _, err := a.db.Exec(deleteTokenQuery, userID, token); err != nil {
  743. return err
  744. }
  745. return nil
  746. }
  747. // RemoveExpiredTokens deletes all expired tokens from the database
  748. func (a *Manager) RemoveExpiredTokens() error {
  749. if _, err := a.db.Exec(deleteExpiredTokensQuery, time.Now().Unix()); err != nil {
  750. return err
  751. }
  752. return nil
  753. }
  754. // PhoneNumbers returns all phone numbers for the user with the given user ID
  755. func (a *Manager) PhoneNumbers(userID string) ([]string, error) {
  756. rows, err := a.db.Query(selectPhoneNumbersQuery, userID)
  757. if err != nil {
  758. return nil, err
  759. }
  760. defer rows.Close()
  761. phoneNumbers := make([]string, 0)
  762. for {
  763. phoneNumber, err := a.readPhoneNumber(rows)
  764. if errors.Is(err, ErrPhoneNumberNotFound) {
  765. break
  766. } else if err != nil {
  767. return nil, err
  768. }
  769. phoneNumbers = append(phoneNumbers, phoneNumber)
  770. }
  771. return phoneNumbers, nil
  772. }
  773. func (a *Manager) readPhoneNumber(rows *sql.Rows) (string, error) {
  774. var phoneNumber string
  775. if !rows.Next() {
  776. return "", ErrPhoneNumberNotFound
  777. }
  778. if err := rows.Scan(&phoneNumber); err != nil {
  779. return "", err
  780. } else if err := rows.Err(); err != nil {
  781. return "", err
  782. }
  783. return phoneNumber, nil
  784. }
  785. // AddPhoneNumber adds a phone number to the user with the given user ID
  786. func (a *Manager) AddPhoneNumber(userID string, phoneNumber string) error {
  787. if _, err := a.db.Exec(insertPhoneNumberQuery, userID, phoneNumber); err != nil {
  788. if sqliteErr, ok := err.(sqlite3.Error); ok && sqliteErr.ExtendedCode == sqlite3.ErrConstraintUnique {
  789. return ErrPhoneNumberExists
  790. }
  791. return err
  792. }
  793. return nil
  794. }
  795. // RemovePhoneNumber deletes a phone number from the user with the given user ID
  796. func (a *Manager) RemovePhoneNumber(userID string, phoneNumber string) error {
  797. _, err := a.db.Exec(deletePhoneNumberQuery, userID, phoneNumber)
  798. return err
  799. }
  800. // RemoveDeletedUsers deletes all users that have been marked deleted for
  801. func (a *Manager) RemoveDeletedUsers() error {
  802. if _, err := a.db.Exec(deleteUsersMarkedQuery, time.Now().Unix()); err != nil {
  803. return err
  804. }
  805. return nil
  806. }
  807. // ChangeSettings persists the user settings
  808. func (a *Manager) ChangeSettings(userID string, prefs *Prefs) error {
  809. b, err := json.Marshal(prefs)
  810. if err != nil {
  811. return err
  812. }
  813. if _, err := a.db.Exec(updateUserPrefsQuery, string(b), userID); err != nil {
  814. return err
  815. }
  816. return nil
  817. }
  818. // ResetStats resets all user stats in the user database. This touches all users.
  819. func (a *Manager) ResetStats() error {
  820. a.mu.Lock() // Includes database query to avoid races!
  821. defer a.mu.Unlock()
  822. if _, err := a.db.Exec(updateUserStatsResetAllQuery); err != nil {
  823. return err
  824. }
  825. a.statsQueue = make(map[string]*Stats)
  826. return nil
  827. }
  828. // EnqueueUserStats adds the user to a queue which writes out user stats (messages, emails, ..) in
  829. // batches at a regular interval
  830. func (a *Manager) EnqueueUserStats(userID string, stats *Stats) {
  831. a.mu.Lock()
  832. defer a.mu.Unlock()
  833. a.statsQueue[userID] = stats
  834. }
  835. // EnqueueTokenUpdate adds the token update to a queue which writes out token access times
  836. // in batches at a regular interval
  837. func (a *Manager) EnqueueTokenUpdate(tokenID string, update *TokenUpdate) {
  838. a.mu.Lock()
  839. defer a.mu.Unlock()
  840. a.tokenQueue[tokenID] = update
  841. }
  842. func (a *Manager) asyncQueueWriter(interval time.Duration) {
  843. ticker := time.NewTicker(interval)
  844. for range ticker.C {
  845. if err := a.writeUserStatsQueue(); err != nil {
  846. log.Tag(tag).Err(err).Warn("Writing user stats queue failed")
  847. }
  848. if err := a.writeTokenUpdateQueue(); err != nil {
  849. log.Tag(tag).Err(err).Warn("Writing token update queue failed")
  850. }
  851. }
  852. }
  853. func (a *Manager) writeUserStatsQueue() error {
  854. a.mu.Lock()
  855. if len(a.statsQueue) == 0 {
  856. a.mu.Unlock()
  857. log.Tag(tag).Trace("No user stats updates to commit")
  858. return nil
  859. }
  860. statsQueue := a.statsQueue
  861. a.statsQueue = make(map[string]*Stats)
  862. a.mu.Unlock()
  863. tx, err := a.db.Begin()
  864. if err != nil {
  865. return err
  866. }
  867. defer tx.Rollback()
  868. log.Tag(tag).Debug("Writing user stats queue for %d user(s)", len(statsQueue))
  869. for userID, update := range statsQueue {
  870. log.
  871. Tag(tag).
  872. Fields(log.Context{
  873. "user_id": userID,
  874. "messages_count": update.Messages,
  875. "emails_count": update.Emails,
  876. "calls_count": update.Calls,
  877. }).
  878. Trace("Updating stats for user %s", userID)
  879. if _, err := tx.Exec(updateUserStatsQuery, update.Messages, update.Emails, update.Calls, userID); err != nil {
  880. return err
  881. }
  882. }
  883. return tx.Commit()
  884. }
  885. func (a *Manager) writeTokenUpdateQueue() error {
  886. a.mu.Lock()
  887. if len(a.tokenQueue) == 0 {
  888. a.mu.Unlock()
  889. log.Tag(tag).Trace("No token updates to commit")
  890. return nil
  891. }
  892. tokenQueue := a.tokenQueue
  893. a.tokenQueue = make(map[string]*TokenUpdate)
  894. a.mu.Unlock()
  895. tx, err := a.db.Begin()
  896. if err != nil {
  897. return err
  898. }
  899. defer tx.Rollback()
  900. log.Tag(tag).Debug("Writing token update queue for %d token(s)", len(tokenQueue))
  901. for tokenID, update := range tokenQueue {
  902. log.Tag(tag).Trace("Updating token %s with last access time %v", tokenID, update.LastAccess.Unix())
  903. if _, err := tx.Exec(updateTokenLastAccessQuery, update.LastAccess.Unix(), update.LastOrigin.String(), tokenID); err != nil {
  904. return err
  905. }
  906. }
  907. return tx.Commit()
  908. }
  909. // Authorize returns nil if the given user has access to the given topic using the desired
  910. // permission. The user param may be nil to signal an anonymous user.
  911. func (a *Manager) Authorize(user *User, topic string, perm Permission) error {
  912. if user != nil && user.Role == RoleAdmin {
  913. return nil // Admin can do everything
  914. }
  915. username := Everyone
  916. if user != nil {
  917. username = user.Name
  918. }
  919. // Select the read/write permissions for this user/topic combo.
  920. // - The query may return two rows (one for everyone, and one for the user), but prioritizes the user.
  921. // - Furthermore, the query prioritizes more specific permissions (longer!) over more generic ones, e.g. "test*" > "*"
  922. // - It also prioritizes write permissions over read permissions
  923. rows, err := a.db.Query(selectTopicPermsQuery, Everyone, username, topic)
  924. if err != nil {
  925. return err
  926. }
  927. defer rows.Close()
  928. if !rows.Next() {
  929. return a.resolvePerms(a.config.DefaultAccess, perm)
  930. }
  931. var read, write bool
  932. if err := rows.Scan(&read, &write); err != nil {
  933. return err
  934. } else if err := rows.Err(); err != nil {
  935. return err
  936. }
  937. return a.resolvePerms(NewPermission(read, write), perm)
  938. }
  939. func (a *Manager) resolvePerms(base, perm Permission) error {
  940. if perm == PermissionRead && base.IsRead() {
  941. return nil
  942. } else if perm == PermissionWrite && base.IsWrite() {
  943. return nil
  944. }
  945. return ErrUnauthorized
  946. }
  947. // AddUser adds a user with the given username, password and role
  948. func (a *Manager) AddUser(username, password string, role Role, hashed bool) error {
  949. return execTx(a.db, func(tx *sql.Tx) error {
  950. return a.addUserTx(tx, username, password, role, hashed, false)
  951. })
  952. }
  953. // AddUser adds a user with the given username, password and role
  954. func (a *Manager) addUserTx(tx *sql.Tx, username, password string, role Role, hashed, provisioned bool) error {
  955. if !AllowedUsername(username) || !AllowedRole(role) {
  956. return ErrInvalidArgument
  957. }
  958. var hash string
  959. var err error = nil
  960. if hashed {
  961. hash = password
  962. if err := ValidPasswordHash(hash); err != nil {
  963. return err
  964. }
  965. } else {
  966. hash, err = hashPassword(password, a.config.BcryptCost)
  967. if err != nil {
  968. return err
  969. }
  970. }
  971. userID := util.RandomStringPrefix(userIDPrefix, userIDLength)
  972. syncTopic, now := util.RandomStringPrefix(syncTopicPrefix, syncTopicLength), time.Now().Unix()
  973. if _, err = tx.Exec(insertUserQuery, userID, username, hash, role, syncTopic, provisioned, now); err != nil {
  974. if errors.Is(err, sqlite3.ErrConstraintUnique) {
  975. return ErrUserExists
  976. }
  977. return err
  978. }
  979. return nil
  980. }
  981. // RemoveUser deletes the user with the given username. The function returns nil on success, even
  982. // if the user did not exist in the first place.
  983. func (a *Manager) RemoveUser(username string) error {
  984. return execTx(a.db, func(tx *sql.Tx) error {
  985. return a.removeUserTx(tx, username)
  986. })
  987. }
  988. func (a *Manager) removeUserTx(tx *sql.Tx, username string) error {
  989. if !AllowedUsername(username) {
  990. return ErrInvalidArgument
  991. }
  992. // Rows in user_access, user_token, etc. are deleted via foreign keys
  993. if _, err := tx.Exec(deleteUserQuery, username); err != nil {
  994. return err
  995. }
  996. return nil
  997. }
  998. // MarkUserRemoved sets the deleted flag on the user, and deletes all access tokens. This prevents
  999. // successful auth via Authenticate. A background process will delete the user at a later date.
  1000. func (a *Manager) MarkUserRemoved(user *User) error {
  1001. if !AllowedUsername(user.Name) {
  1002. return ErrInvalidArgument
  1003. }
  1004. tx, err := a.db.Begin()
  1005. if err != nil {
  1006. return err
  1007. }
  1008. defer tx.Rollback()
  1009. if _, err := tx.Exec(deleteUserAccessQuery, user.Name, user.Name); err != nil {
  1010. return err
  1011. }
  1012. if _, err := tx.Exec(deleteAllTokenQuery, user.ID); err != nil {
  1013. return err
  1014. }
  1015. if _, err := tx.Exec(updateUserDeletedQuery, time.Now().Add(userHardDeleteAfterDuration).Unix(), user.ID); err != nil {
  1016. return err
  1017. }
  1018. return tx.Commit()
  1019. }
  1020. // Users returns a list of users. It always also returns the Everyone user ("*").
  1021. func (a *Manager) Users() ([]*User, error) {
  1022. rows, err := a.db.Query(selectUsernamesQuery)
  1023. if err != nil {
  1024. return nil, err
  1025. }
  1026. defer rows.Close()
  1027. usernames := make([]string, 0)
  1028. for rows.Next() {
  1029. var username string
  1030. if err := rows.Scan(&username); err != nil {
  1031. return nil, err
  1032. } else if err := rows.Err(); err != nil {
  1033. return nil, err
  1034. }
  1035. usernames = append(usernames, username)
  1036. }
  1037. rows.Close()
  1038. users := make([]*User, 0)
  1039. for _, username := range usernames {
  1040. user, err := a.User(username)
  1041. if err != nil {
  1042. return nil, err
  1043. }
  1044. users = append(users, user)
  1045. }
  1046. return users, nil
  1047. }
  1048. // UsersCount returns the number of users in the databsae
  1049. func (a *Manager) UsersCount() (int64, error) {
  1050. rows, err := a.db.Query(selectUserCountQuery)
  1051. if err != nil {
  1052. return 0, err
  1053. }
  1054. defer rows.Close()
  1055. if !rows.Next() {
  1056. return 0, errNoRows
  1057. }
  1058. var count int64
  1059. if err := rows.Scan(&count); err != nil {
  1060. return 0, err
  1061. }
  1062. return count, nil
  1063. }
  1064. // User returns the user with the given username if it exists, or ErrUserNotFound otherwise.
  1065. // You may also pass Everyone to retrieve the anonymous user and its Grant list.
  1066. func (a *Manager) User(username string) (*User, error) {
  1067. rows, err := a.db.Query(selectUserByNameQuery, username)
  1068. if err != nil {
  1069. return nil, err
  1070. }
  1071. return a.readUser(rows)
  1072. }
  1073. // UserByID returns the user with the given ID if it exists, or ErrUserNotFound otherwise
  1074. func (a *Manager) UserByID(id string) (*User, error) {
  1075. rows, err := a.db.Query(selectUserByIDQuery, id)
  1076. if err != nil {
  1077. return nil, err
  1078. }
  1079. return a.readUser(rows)
  1080. }
  1081. // UserByStripeCustomer returns the user with the given Stripe customer ID if it exists, or ErrUserNotFound otherwise.
  1082. func (a *Manager) UserByStripeCustomer(stripeCustomerID string) (*User, error) {
  1083. rows, err := a.db.Query(selectUserByStripeCustomerIDQuery, stripeCustomerID)
  1084. if err != nil {
  1085. return nil, err
  1086. }
  1087. return a.readUser(rows)
  1088. }
  1089. func (a *Manager) userByToken(token string) (*User, error) {
  1090. rows, err := a.db.Query(selectUserByTokenQuery, token, time.Now().Unix())
  1091. if err != nil {
  1092. return nil, err
  1093. }
  1094. return a.readUser(rows)
  1095. }
  1096. func (a *Manager) readUser(rows *sql.Rows) (*User, error) {
  1097. defer rows.Close()
  1098. var id, username, hash, role, prefs, syncTopic string
  1099. var provisioned bool
  1100. var stripeCustomerID, stripeSubscriptionID, stripeSubscriptionStatus, stripeSubscriptionInterval, stripeMonthlyPriceID, stripeYearlyPriceID, tierID, tierCode, tierName sql.NullString
  1101. var messages, emails, calls int64
  1102. var messagesLimit, messagesExpiryDuration, emailsLimit, callsLimit, reservationsLimit, attachmentFileSizeLimit, attachmentTotalSizeLimit, attachmentExpiryDuration, attachmentBandwidthLimit, stripeSubscriptionPaidUntil, stripeSubscriptionCancelAt, deleted sql.NullInt64
  1103. if !rows.Next() {
  1104. return nil, ErrUserNotFound
  1105. }
  1106. if err := rows.Scan(&id, &username, &hash, &role, &prefs, &syncTopic, &provisioned, &messages, &emails, &calls, &stripeCustomerID, &stripeSubscriptionID, &stripeSubscriptionStatus, &stripeSubscriptionInterval, &stripeSubscriptionPaidUntil, &stripeSubscriptionCancelAt, &deleted, &tierID, &tierCode, &tierName, &messagesLimit, &messagesExpiryDuration, &emailsLimit, &callsLimit, &reservationsLimit, &attachmentFileSizeLimit, &attachmentTotalSizeLimit, &attachmentExpiryDuration, &attachmentBandwidthLimit, &stripeMonthlyPriceID, &stripeYearlyPriceID); err != nil {
  1107. return nil, err
  1108. } else if err := rows.Err(); err != nil {
  1109. return nil, err
  1110. }
  1111. user := &User{
  1112. ID: id,
  1113. Name: username,
  1114. Hash: hash,
  1115. Role: Role(role),
  1116. Prefs: &Prefs{},
  1117. SyncTopic: syncTopic,
  1118. Provisioned: provisioned,
  1119. Stats: &Stats{
  1120. Messages: messages,
  1121. Emails: emails,
  1122. Calls: calls,
  1123. },
  1124. Billing: &Billing{
  1125. StripeCustomerID: stripeCustomerID.String, // May be empty
  1126. StripeSubscriptionID: stripeSubscriptionID.String, // May be empty
  1127. StripeSubscriptionStatus: stripe.SubscriptionStatus(stripeSubscriptionStatus.String), // May be empty
  1128. StripeSubscriptionInterval: stripe.PriceRecurringInterval(stripeSubscriptionInterval.String), // May be empty
  1129. StripeSubscriptionPaidUntil: time.Unix(stripeSubscriptionPaidUntil.Int64, 0), // May be zero
  1130. StripeSubscriptionCancelAt: time.Unix(stripeSubscriptionCancelAt.Int64, 0), // May be zero
  1131. },
  1132. Deleted: deleted.Valid,
  1133. }
  1134. if err := json.Unmarshal([]byte(prefs), user.Prefs); err != nil {
  1135. return nil, err
  1136. }
  1137. if tierCode.Valid {
  1138. // See readTier() when this is changed!
  1139. user.Tier = &Tier{
  1140. ID: tierID.String,
  1141. Code: tierCode.String,
  1142. Name: tierName.String,
  1143. MessageLimit: messagesLimit.Int64,
  1144. MessageExpiryDuration: time.Duration(messagesExpiryDuration.Int64) * time.Second,
  1145. EmailLimit: emailsLimit.Int64,
  1146. CallLimit: callsLimit.Int64,
  1147. ReservationLimit: reservationsLimit.Int64,
  1148. AttachmentFileSizeLimit: attachmentFileSizeLimit.Int64,
  1149. AttachmentTotalSizeLimit: attachmentTotalSizeLimit.Int64,
  1150. AttachmentExpiryDuration: time.Duration(attachmentExpiryDuration.Int64) * time.Second,
  1151. AttachmentBandwidthLimit: attachmentBandwidthLimit.Int64,
  1152. StripeMonthlyPriceID: stripeMonthlyPriceID.String, // May be empty
  1153. StripeYearlyPriceID: stripeYearlyPriceID.String, // May be empty
  1154. }
  1155. }
  1156. return user, nil
  1157. }
  1158. // AllGrants returns all user-specific access control entries, mapped to their respective user IDs
  1159. func (a *Manager) AllGrants() (map[string][]Grant, error) {
  1160. rows, err := a.db.Query(selectUserAllAccessQuery)
  1161. if err != nil {
  1162. return nil, err
  1163. }
  1164. defer rows.Close()
  1165. grants := make(map[string][]Grant, 0)
  1166. for rows.Next() {
  1167. var userID, topic string
  1168. var read, write, provisioned bool
  1169. if err := rows.Scan(&userID, &topic, &read, &write, &provisioned); err != nil {
  1170. return nil, err
  1171. } else if err := rows.Err(); err != nil {
  1172. return nil, err
  1173. }
  1174. if _, ok := grants[userID]; !ok {
  1175. grants[userID] = make([]Grant, 0)
  1176. }
  1177. grants[userID] = append(grants[userID], Grant{
  1178. TopicPattern: fromSQLWildcard(topic),
  1179. Permission: NewPermission(read, write),
  1180. Provisioned: provisioned,
  1181. })
  1182. }
  1183. return grants, nil
  1184. }
  1185. // Grants returns all user-specific access control entries
  1186. func (a *Manager) Grants(username string) ([]Grant, error) {
  1187. rows, err := a.db.Query(selectUserAccessQuery, username)
  1188. if err != nil {
  1189. return nil, err
  1190. }
  1191. defer rows.Close()
  1192. grants := make([]Grant, 0)
  1193. for rows.Next() {
  1194. var topic string
  1195. var read, write, provisioned bool
  1196. if err := rows.Scan(&topic, &read, &write, &provisioned); err != nil {
  1197. return nil, err
  1198. } else if err := rows.Err(); err != nil {
  1199. return nil, err
  1200. }
  1201. grants = append(grants, Grant{
  1202. TopicPattern: fromSQLWildcard(topic),
  1203. Permission: NewPermission(read, write),
  1204. Provisioned: provisioned,
  1205. })
  1206. }
  1207. return grants, nil
  1208. }
  1209. // Reservations returns all user-owned topics, and the associated everyone-access
  1210. func (a *Manager) Reservations(username string) ([]Reservation, error) {
  1211. rows, err := a.db.Query(selectUserReservationsQuery, Everyone, username)
  1212. if err != nil {
  1213. return nil, err
  1214. }
  1215. defer rows.Close()
  1216. reservations := make([]Reservation, 0)
  1217. for rows.Next() {
  1218. var topic string
  1219. var ownerRead, ownerWrite bool
  1220. var everyoneRead, everyoneWrite sql.NullBool
  1221. if err := rows.Scan(&topic, &ownerRead, &ownerWrite, &everyoneRead, &everyoneWrite); err != nil {
  1222. return nil, err
  1223. } else if err := rows.Err(); err != nil {
  1224. return nil, err
  1225. }
  1226. reservations = append(reservations, Reservation{
  1227. Topic: unescapeUnderscore(topic),
  1228. Owner: NewPermission(ownerRead, ownerWrite),
  1229. Everyone: NewPermission(everyoneRead.Bool, everyoneWrite.Bool), // false if null
  1230. })
  1231. }
  1232. return reservations, nil
  1233. }
  1234. // HasReservation returns true if the given topic access is owned by the user
  1235. func (a *Manager) HasReservation(username, topic string) (bool, error) {
  1236. rows, err := a.db.Query(selectUserHasReservationQuery, username, escapeUnderscore(topic))
  1237. if err != nil {
  1238. return false, err
  1239. }
  1240. defer rows.Close()
  1241. if !rows.Next() {
  1242. return false, errNoRows
  1243. }
  1244. var count int64
  1245. if err := rows.Scan(&count); err != nil {
  1246. return false, err
  1247. }
  1248. return count > 0, nil
  1249. }
  1250. // ReservationsCount returns the number of reservations owned by this user
  1251. func (a *Manager) ReservationsCount(username string) (int64, error) {
  1252. rows, err := a.db.Query(selectUserReservationsCountQuery, username)
  1253. if err != nil {
  1254. return 0, err
  1255. }
  1256. defer rows.Close()
  1257. if !rows.Next() {
  1258. return 0, errNoRows
  1259. }
  1260. var count int64
  1261. if err := rows.Scan(&count); err != nil {
  1262. return 0, err
  1263. }
  1264. return count, nil
  1265. }
  1266. // ReservationOwner returns user ID of the user that owns this topic, or an
  1267. // empty string if it's not owned by anyone
  1268. func (a *Manager) ReservationOwner(topic string) (string, error) {
  1269. rows, err := a.db.Query(selectUserReservationsOwnerQuery, escapeUnderscore(topic))
  1270. if err != nil {
  1271. return "", err
  1272. }
  1273. defer rows.Close()
  1274. if !rows.Next() {
  1275. return "", nil
  1276. }
  1277. var ownerUserID string
  1278. if err := rows.Scan(&ownerUserID); err != nil {
  1279. return "", err
  1280. }
  1281. return ownerUserID, nil
  1282. }
  1283. // ChangePassword changes a user's password
  1284. func (a *Manager) ChangePassword(username, password string, hashed bool) error {
  1285. return execTx(a.db, func(tx *sql.Tx) error {
  1286. return a.changePasswordTx(tx, username, password, hashed)
  1287. })
  1288. }
  1289. func (a *Manager) changePasswordTx(tx *sql.Tx, username, password string, hashed bool) error {
  1290. var hash string
  1291. var err error
  1292. if hashed {
  1293. hash = password
  1294. if err := ValidPasswordHash(hash); err != nil {
  1295. return err
  1296. }
  1297. } else {
  1298. hash, err = hashPassword(password, a.config.BcryptCost)
  1299. if err != nil {
  1300. return err
  1301. }
  1302. }
  1303. if _, err := tx.Exec(updateUserPassQuery, hash, username); err != nil {
  1304. return err
  1305. }
  1306. return nil
  1307. }
  1308. // ChangeRole changes a user's role. When a role is changed from RoleUser to RoleAdmin,
  1309. // all existing access control entries (Grant) are removed, since they are no longer needed.
  1310. func (a *Manager) ChangeRole(username string, role Role) error {
  1311. return execTx(a.db, func(tx *sql.Tx) error {
  1312. return a.changeRoleTx(tx, username, role)
  1313. })
  1314. }
  1315. func (a *Manager) changeRoleTx(tx *sql.Tx, username string, role Role) error {
  1316. if !AllowedUsername(username) || !AllowedRole(role) {
  1317. return ErrInvalidArgument
  1318. }
  1319. if _, err := tx.Exec(updateUserRoleQuery, string(role), username); err != nil {
  1320. return err
  1321. }
  1322. if role == RoleAdmin {
  1323. if _, err := tx.Exec(deleteUserAccessQuery, username, username); err != nil {
  1324. return err
  1325. }
  1326. }
  1327. return nil
  1328. }
  1329. // ChangeProvisioned changes the provisioned status of a user. This is used to mark users as
  1330. // provisioned. A provisioned user is a user defined in the config file.
  1331. func (a *Manager) ChangeProvisioned(username string, provisioned bool) error {
  1332. return execTx(a.db, func(tx *sql.Tx) error {
  1333. return a.changeProvisionedTx(tx, username, provisioned)
  1334. })
  1335. }
  1336. func (a *Manager) changeProvisionedTx(tx *sql.Tx, username string, provisioned bool) error {
  1337. if _, err := tx.Exec(updateUserProvisionedQuery, provisioned, username); err != nil {
  1338. return err
  1339. }
  1340. return nil
  1341. }
  1342. // ChangeTier changes a user's tier using the tier code. This function does not delete reservations, messages,
  1343. // or attachments, even if the new tier has lower limits in this regard. That has to be done elsewhere.
  1344. func (a *Manager) ChangeTier(username, tier string) error {
  1345. if !AllowedUsername(username) {
  1346. return ErrInvalidArgument
  1347. }
  1348. t, err := a.Tier(tier)
  1349. if err != nil {
  1350. return err
  1351. } else if err := a.checkReservationsLimit(username, t.ReservationLimit); err != nil {
  1352. return err
  1353. }
  1354. if _, err := a.db.Exec(updateUserTierQuery, tier, username); err != nil {
  1355. return err
  1356. }
  1357. return nil
  1358. }
  1359. // ResetTier removes the tier from the given user
  1360. func (a *Manager) ResetTier(username string) error {
  1361. if !AllowedUsername(username) && username != Everyone && username != "" {
  1362. return ErrInvalidArgument
  1363. } else if err := a.checkReservationsLimit(username, 0); err != nil {
  1364. return err
  1365. }
  1366. _, err := a.db.Exec(deleteUserTierQuery, username)
  1367. return err
  1368. }
  1369. func (a *Manager) checkReservationsLimit(username string, reservationsLimit int64) error {
  1370. u, err := a.User(username)
  1371. if err != nil {
  1372. return err
  1373. }
  1374. if u.Tier != nil && reservationsLimit < u.Tier.ReservationLimit {
  1375. reservations, err := a.Reservations(username)
  1376. if err != nil {
  1377. return err
  1378. } else if int64(len(reservations)) > reservationsLimit {
  1379. return ErrTooManyReservations
  1380. }
  1381. }
  1382. return nil
  1383. }
  1384. // AllowReservation tests if a user may create an access control entry for the given topic.
  1385. // If there are any ACL entries that are not owned by the user, an error is returned.
  1386. func (a *Manager) AllowReservation(username string, topic string) error {
  1387. if (!AllowedUsername(username) && username != Everyone) || !AllowedTopic(topic) {
  1388. return ErrInvalidArgument
  1389. }
  1390. rows, err := a.db.Query(selectOtherAccessCountQuery, escapeUnderscore(topic), escapeUnderscore(topic), username)
  1391. if err != nil {
  1392. return err
  1393. }
  1394. defer rows.Close()
  1395. if !rows.Next() {
  1396. return errNoRows
  1397. }
  1398. var otherCount int
  1399. if err := rows.Scan(&otherCount); err != nil {
  1400. return err
  1401. }
  1402. if otherCount > 0 {
  1403. return errTopicOwnedByOthers
  1404. }
  1405. return nil
  1406. }
  1407. // AllowAccess adds or updates an entry in th access control list for a specific user. It controls
  1408. // read/write access to a topic. The parameter topicPattern may include wildcards (*). The ACL entry
  1409. // owner may either be a user (username), or the system (empty).
  1410. func (a *Manager) AllowAccess(username string, topicPattern string, permission Permission) error {
  1411. return execTx(a.db, func(tx *sql.Tx) error {
  1412. return a.allowAccessTx(tx, username, topicPattern, permission, false)
  1413. })
  1414. }
  1415. func (a *Manager) allowAccessTx(tx *sql.Tx, username string, topicPattern string, permission Permission, provisioned bool) error {
  1416. if !AllowedUsername(username) && username != Everyone {
  1417. return ErrInvalidArgument
  1418. } else if !AllowedTopicPattern(topicPattern) {
  1419. return ErrInvalidArgument
  1420. }
  1421. owner := ""
  1422. if _, err := tx.Exec(upsertUserAccessQuery, username, toSQLWildcard(topicPattern), permission.IsRead(), permission.IsWrite(), owner, owner, provisioned); err != nil {
  1423. return err
  1424. }
  1425. return nil
  1426. }
  1427. // ResetAccess removes an access control list entry for a specific username/topic, or (if topic is
  1428. // empty) for an entire user. The parameter topicPattern may include wildcards (*).
  1429. func (a *Manager) ResetAccess(username string, topicPattern string) error {
  1430. return execTx(a.db, func(tx *sql.Tx) error {
  1431. return a.resetAccessTx(tx, username, topicPattern)
  1432. })
  1433. }
  1434. func (a *Manager) resetAccessTx(tx *sql.Tx, username string, topicPattern string) error {
  1435. if !AllowedUsername(username) && username != Everyone && username != "" {
  1436. return ErrInvalidArgument
  1437. } else if !AllowedTopicPattern(topicPattern) && topicPattern != "" {
  1438. return ErrInvalidArgument
  1439. }
  1440. if username == "" && topicPattern == "" {
  1441. _, err := tx.Exec(deleteAllAccessQuery, username)
  1442. return err
  1443. } else if topicPattern == "" {
  1444. _, err := tx.Exec(deleteUserAccessQuery, username, username)
  1445. return err
  1446. }
  1447. _, err := tx.Exec(deleteTopicAccessQuery, username, username, toSQLWildcard(topicPattern))
  1448. return err
  1449. }
  1450. // AddReservation creates two access control entries for the given topic: one with full read/write access for the
  1451. // given user, and one for Everyone with the permission passed as everyone. The user also owns the entries, and
  1452. // can modify or delete them.
  1453. func (a *Manager) AddReservation(username string, topic string, everyone Permission) error {
  1454. if !AllowedUsername(username) || username == Everyone || !AllowedTopic(topic) {
  1455. return ErrInvalidArgument
  1456. }
  1457. tx, err := a.db.Begin()
  1458. if err != nil {
  1459. return err
  1460. }
  1461. defer tx.Rollback()
  1462. if _, err := tx.Exec(upsertUserAccessQuery, username, escapeUnderscore(topic), true, true, username, username, false); err != nil {
  1463. return err
  1464. }
  1465. if _, err := tx.Exec(upsertUserAccessQuery, Everyone, escapeUnderscore(topic), everyone.IsRead(), everyone.IsWrite(), username, username, false); err != nil {
  1466. return err
  1467. }
  1468. return tx.Commit()
  1469. }
  1470. // RemoveReservations deletes the access control entries associated with the given username/topic, as
  1471. // well as all entries with Everyone/topic. This is the counterpart for AddReservation.
  1472. func (a *Manager) RemoveReservations(username string, topics ...string) error {
  1473. if !AllowedUsername(username) || username == Everyone || len(topics) == 0 {
  1474. return ErrInvalidArgument
  1475. }
  1476. for _, topic := range topics {
  1477. if !AllowedTopic(topic) {
  1478. return ErrInvalidArgument
  1479. }
  1480. }
  1481. tx, err := a.db.Begin()
  1482. if err != nil {
  1483. return err
  1484. }
  1485. defer tx.Rollback()
  1486. for _, topic := range topics {
  1487. if _, err := tx.Exec(deleteTopicAccessQuery, username, username, escapeUnderscore(topic)); err != nil {
  1488. return err
  1489. }
  1490. if _, err := tx.Exec(deleteTopicAccessQuery, Everyone, Everyone, escapeUnderscore(topic)); err != nil {
  1491. return err
  1492. }
  1493. }
  1494. return tx.Commit()
  1495. }
  1496. // DefaultAccess returns the default read/write access if no access control entry matches
  1497. func (a *Manager) DefaultAccess() Permission {
  1498. return a.config.DefaultAccess
  1499. }
  1500. // AddTier creates a new tier in the database
  1501. func (a *Manager) AddTier(tier *Tier) error {
  1502. if tier.ID == "" {
  1503. tier.ID = util.RandomStringPrefix(tierIDPrefix, tierIDLength)
  1504. }
  1505. if _, err := a.db.Exec(insertTierQuery, tier.ID, tier.Code, tier.Name, tier.MessageLimit, int64(tier.MessageExpiryDuration.Seconds()), tier.EmailLimit, tier.CallLimit, tier.ReservationLimit, tier.AttachmentFileSizeLimit, tier.AttachmentTotalSizeLimit, int64(tier.AttachmentExpiryDuration.Seconds()), tier.AttachmentBandwidthLimit, nullString(tier.StripeMonthlyPriceID), nullString(tier.StripeYearlyPriceID)); err != nil {
  1506. return err
  1507. }
  1508. return nil
  1509. }
  1510. // UpdateTier updates a tier's properties in the database
  1511. func (a *Manager) UpdateTier(tier *Tier) error {
  1512. if _, err := a.db.Exec(updateTierQuery, tier.Name, tier.MessageLimit, int64(tier.MessageExpiryDuration.Seconds()), tier.EmailLimit, tier.CallLimit, tier.ReservationLimit, tier.AttachmentFileSizeLimit, tier.AttachmentTotalSizeLimit, int64(tier.AttachmentExpiryDuration.Seconds()), tier.AttachmentBandwidthLimit, nullString(tier.StripeMonthlyPriceID), nullString(tier.StripeYearlyPriceID), tier.Code); err != nil {
  1513. return err
  1514. }
  1515. return nil
  1516. }
  1517. // RemoveTier deletes the tier with the given code
  1518. func (a *Manager) RemoveTier(code string) error {
  1519. if !AllowedTier(code) {
  1520. return ErrInvalidArgument
  1521. }
  1522. // This fails if any user has this tier
  1523. if _, err := a.db.Exec(deleteTierQuery, code); err != nil {
  1524. return err
  1525. }
  1526. return nil
  1527. }
  1528. // ChangeBilling updates a user's billing fields, namely the Stripe customer ID, and subscription information
  1529. func (a *Manager) ChangeBilling(username string, billing *Billing) error {
  1530. if _, err := a.db.Exec(updateBillingQuery, nullString(billing.StripeCustomerID), nullString(billing.StripeSubscriptionID), nullString(string(billing.StripeSubscriptionStatus)), nullString(string(billing.StripeSubscriptionInterval)), nullInt64(billing.StripeSubscriptionPaidUntil.Unix()), nullInt64(billing.StripeSubscriptionCancelAt.Unix()), username); err != nil {
  1531. return err
  1532. }
  1533. return nil
  1534. }
  1535. // Tiers returns a list of all Tier structs
  1536. func (a *Manager) Tiers() ([]*Tier, error) {
  1537. rows, err := a.db.Query(selectTiersQuery)
  1538. if err != nil {
  1539. return nil, err
  1540. }
  1541. defer rows.Close()
  1542. tiers := make([]*Tier, 0)
  1543. for {
  1544. tier, err := a.readTier(rows)
  1545. if err == ErrTierNotFound {
  1546. break
  1547. } else if err != nil {
  1548. return nil, err
  1549. }
  1550. tiers = append(tiers, tier)
  1551. }
  1552. return tiers, nil
  1553. }
  1554. // Tier returns a Tier based on the code, or ErrTierNotFound if it does not exist
  1555. func (a *Manager) Tier(code string) (*Tier, error) {
  1556. rows, err := a.db.Query(selectTierByCodeQuery, code)
  1557. if err != nil {
  1558. return nil, err
  1559. }
  1560. defer rows.Close()
  1561. return a.readTier(rows)
  1562. }
  1563. // TierByStripePrice returns a Tier based on the Stripe price ID, or ErrTierNotFound if it does not exist
  1564. func (a *Manager) TierByStripePrice(priceID string) (*Tier, error) {
  1565. rows, err := a.db.Query(selectTierByPriceIDQuery, priceID, priceID)
  1566. if err != nil {
  1567. return nil, err
  1568. }
  1569. defer rows.Close()
  1570. return a.readTier(rows)
  1571. }
  1572. func (a *Manager) readTier(rows *sql.Rows) (*Tier, error) {
  1573. var id, code, name string
  1574. var stripeMonthlyPriceID, stripeYearlyPriceID sql.NullString
  1575. var messagesLimit, messagesExpiryDuration, emailsLimit, callsLimit, reservationsLimit, attachmentFileSizeLimit, attachmentTotalSizeLimit, attachmentExpiryDuration, attachmentBandwidthLimit sql.NullInt64
  1576. if !rows.Next() {
  1577. return nil, ErrTierNotFound
  1578. }
  1579. if err := rows.Scan(&id, &code, &name, &messagesLimit, &messagesExpiryDuration, &emailsLimit, &callsLimit, &reservationsLimit, &attachmentFileSizeLimit, &attachmentTotalSizeLimit, &attachmentExpiryDuration, &attachmentBandwidthLimit, &stripeMonthlyPriceID, &stripeYearlyPriceID); err != nil {
  1580. return nil, err
  1581. } else if err := rows.Err(); err != nil {
  1582. return nil, err
  1583. }
  1584. // When changed, note readUser() as well
  1585. return &Tier{
  1586. ID: id,
  1587. Code: code,
  1588. Name: name,
  1589. MessageLimit: messagesLimit.Int64,
  1590. MessageExpiryDuration: time.Duration(messagesExpiryDuration.Int64) * time.Second,
  1591. EmailLimit: emailsLimit.Int64,
  1592. CallLimit: callsLimit.Int64,
  1593. ReservationLimit: reservationsLimit.Int64,
  1594. AttachmentFileSizeLimit: attachmentFileSizeLimit.Int64,
  1595. AttachmentTotalSizeLimit: attachmentTotalSizeLimit.Int64,
  1596. AttachmentExpiryDuration: time.Duration(attachmentExpiryDuration.Int64) * time.Second,
  1597. AttachmentBandwidthLimit: attachmentBandwidthLimit.Int64,
  1598. StripeMonthlyPriceID: stripeMonthlyPriceID.String, // May be empty
  1599. StripeYearlyPriceID: stripeYearlyPriceID.String, // May be empty
  1600. }, nil
  1601. }
  1602. // Close closes the underlying database
  1603. func (a *Manager) Close() error {
  1604. return a.db.Close()
  1605. }
  1606. func (a *Manager) maybeProvisionUsersAndAccess() error {
  1607. if !a.config.ProvisionEnabled {
  1608. return nil
  1609. }
  1610. users, err := a.Users()
  1611. if err != nil {
  1612. return err
  1613. }
  1614. provisionUsernames := util.Map(a.config.Users, func(u *User) string {
  1615. return u.Name
  1616. })
  1617. return execTx(a.db, func(tx *sql.Tx) error {
  1618. // Remove users that are provisioned, but not in the config anymore
  1619. for _, user := range users {
  1620. if user.Name == Everyone {
  1621. continue
  1622. } else if user.Provisioned && !util.Contains(provisionUsernames, user.Name) {
  1623. if err := a.removeUserTx(tx, user.Name); err != nil {
  1624. return fmt.Errorf("failed to remove provisioned user %s: %v", user.Name, err)
  1625. }
  1626. }
  1627. }
  1628. // Add or update provisioned users
  1629. for _, user := range a.config.Users {
  1630. if user.Name == Everyone {
  1631. continue
  1632. }
  1633. existingUser, exists := util.Find(users, func(u *User) bool {
  1634. return u.Name == user.Name
  1635. })
  1636. if !exists {
  1637. if err := a.addUserTx(tx, user.Name, user.Hash, user.Role, true, true); err != nil && !errors.Is(err, ErrUserExists) {
  1638. return fmt.Errorf("failed to add provisioned user %s: %v", user.Name, err)
  1639. }
  1640. } else {
  1641. if !existingUser.Provisioned {
  1642. if err := a.changeProvisionedTx(tx, user.Name, true); err != nil {
  1643. return fmt.Errorf("failed to change provisioned status for user %s: %v", user.Name, err)
  1644. }
  1645. }
  1646. if existingUser.Hash != user.Hash {
  1647. if err := a.changePasswordTx(tx, user.Name, user.Hash, true); err != nil {
  1648. return fmt.Errorf("failed to change password for provisioned user %s: %v", user.Name, err)
  1649. }
  1650. }
  1651. if existingUser.Role != user.Role {
  1652. if err := a.changeRoleTx(tx, user.Name, user.Role); err != nil {
  1653. return fmt.Errorf("failed to change role for provisioned user %s: %v", user.Name, err)
  1654. }
  1655. }
  1656. }
  1657. }
  1658. // Remove and (re-)add provisioned grants
  1659. if _, err := tx.Exec(deleteUserAccessProvisionedQuery); err != nil {
  1660. return err
  1661. }
  1662. for username, grants := range a.config.Access {
  1663. user, exists := util.Find(a.config.Users, func(u *User) bool {
  1664. return u.Name == username
  1665. })
  1666. if !exists && username != Everyone {
  1667. return fmt.Errorf("user %s is not a provisioned user, refusing to add ACL entry", username)
  1668. } else if user != nil && user.Role == RoleAdmin {
  1669. return fmt.Errorf("adding access control entries is not allowed for admin roles for user %s", username)
  1670. }
  1671. for _, grant := range grants {
  1672. if err := a.resetAccessTx(tx, username, grant.TopicPattern); err != nil {
  1673. return fmt.Errorf("failed to reset access for user %s and topic %s: %v", username, grant.TopicPattern, err)
  1674. }
  1675. if err := a.allowAccessTx(tx, username, grant.TopicPattern, grant.Permission, true); err != nil {
  1676. return err
  1677. }
  1678. }
  1679. }
  1680. // Remove and (re-)add provisioned tokens
  1681. if _, err := tx.Exec(deleteTokensProvisionedQuery); err != nil {
  1682. return err
  1683. }
  1684. for username, tokens := range a.config.Tokens {
  1685. _, exists := util.Find(a.config.Users, func(u *User) bool {
  1686. return u.Name == username
  1687. })
  1688. if !exists && username != Everyone {
  1689. return fmt.Errorf("user %s is not a provisioned user, refusing to add tokens", username)
  1690. }
  1691. var userID string
  1692. row := tx.QueryRow(selectUserIDFromUsernameQuery, username)
  1693. if err := row.Scan(&userID); err != nil {
  1694. return fmt.Errorf("failed to find provisioned user %s for provisioned tokens", username)
  1695. }
  1696. for _, token := range tokens {
  1697. if _, err = a.createTokenTx(tx, userID, token.Value, token.Label, time.Unix(0, 0), netip.IPv4Unspecified(), true); err != nil {
  1698. return err
  1699. }
  1700. }
  1701. }
  1702. return nil
  1703. })
  1704. }
  1705. // toSQLWildcard converts a wildcard string to a SQL wildcard string. It only allows '*' as wildcards,
  1706. // and escapes '_', assuming '\' as escape character.
  1707. func toSQLWildcard(s string) string {
  1708. return escapeUnderscore(strings.ReplaceAll(s, "*", "%"))
  1709. }
  1710. // fromSQLWildcard converts a SQL wildcard string to a wildcard string. It converts '%' to '*',
  1711. // and removes the '\_' escape character.
  1712. func fromSQLWildcard(s string) string {
  1713. return strings.ReplaceAll(unescapeUnderscore(s), "%", "*")
  1714. }
  1715. func escapeUnderscore(s string) string {
  1716. return strings.ReplaceAll(s, "_", "\\_")
  1717. }
  1718. func unescapeUnderscore(s string) string {
  1719. return strings.ReplaceAll(s, "\\_", "_")
  1720. }
  1721. func runStartupQueries(db *sql.DB, startupQueries string) error {
  1722. if _, err := db.Exec(startupQueries); err != nil {
  1723. return err
  1724. }
  1725. if _, err := db.Exec(builtinStartupQueries); err != nil {
  1726. return err
  1727. }
  1728. return nil
  1729. }
  1730. func setupDB(db *sql.DB) error {
  1731. // If 'schemaVersion' table does not exist, this must be a new database
  1732. rowsSV, err := db.Query(selectSchemaVersionQuery)
  1733. if err != nil {
  1734. return setupNewDB(db)
  1735. }
  1736. defer rowsSV.Close()
  1737. // If 'schemaVersion' table exists, read version and potentially upgrade
  1738. schemaVersion := 0
  1739. if !rowsSV.Next() {
  1740. return errors.New("cannot determine schema version: database file may be corrupt")
  1741. }
  1742. if err := rowsSV.Scan(&schemaVersion); err != nil {
  1743. return err
  1744. }
  1745. rowsSV.Close()
  1746. // Do migrations
  1747. if schemaVersion == currentSchemaVersion {
  1748. return nil
  1749. } else if schemaVersion > currentSchemaVersion {
  1750. return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, currentSchemaVersion)
  1751. }
  1752. for i := schemaVersion; i < currentSchemaVersion; i++ {
  1753. fn, ok := migrations[i]
  1754. if !ok {
  1755. return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1)
  1756. } else if err := fn(db); err != nil {
  1757. return err
  1758. }
  1759. }
  1760. return nil
  1761. }
  1762. func setupNewDB(db *sql.DB) error {
  1763. if _, err := db.Exec(createTablesQueries); err != nil {
  1764. return err
  1765. }
  1766. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  1767. return err
  1768. }
  1769. return nil
  1770. }
  1771. func migrateFrom1(db *sql.DB) error {
  1772. log.Tag(tag).Info("Migrating user database schema: from 1 to 2")
  1773. tx, err := db.Begin()
  1774. if err != nil {
  1775. return err
  1776. }
  1777. defer tx.Rollback()
  1778. // Rename user -> user_old, and create new tables
  1779. if _, err := tx.Exec(migrate1To2CreateTablesQueries); err != nil {
  1780. return err
  1781. }
  1782. // Insert users from user_old into new user table, with ID and sync_topic
  1783. rows, err := tx.Query(migrate1To2SelectAllOldUsernamesNoTx)
  1784. if err != nil {
  1785. return err
  1786. }
  1787. defer rows.Close()
  1788. usernames := make([]string, 0)
  1789. for rows.Next() {
  1790. var username string
  1791. if err := rows.Scan(&username); err != nil {
  1792. return err
  1793. }
  1794. usernames = append(usernames, username)
  1795. }
  1796. if err := rows.Close(); err != nil {
  1797. return err
  1798. }
  1799. for _, username := range usernames {
  1800. userID := util.RandomStringPrefix(userIDPrefix, userIDLength)
  1801. syncTopic := util.RandomStringPrefix(syncTopicPrefix, syncTopicLength)
  1802. if _, err := tx.Exec(migrate1To2InsertUserNoTx, userID, syncTopic, username); err != nil {
  1803. return err
  1804. }
  1805. }
  1806. // Migrate old "access" table to "user_access" and drop "access" and "user_old"
  1807. if _, err := tx.Exec(migrate1To2InsertFromOldTablesAndDropNoTx); err != nil {
  1808. return err
  1809. }
  1810. if _, err := tx.Exec(updateSchemaVersion, 2); err != nil {
  1811. return err
  1812. }
  1813. if err := tx.Commit(); err != nil {
  1814. return err
  1815. }
  1816. return nil
  1817. }
  1818. func migrateFrom2(db *sql.DB) error {
  1819. log.Tag(tag).Info("Migrating user database schema: from 2 to 3")
  1820. tx, err := db.Begin()
  1821. if err != nil {
  1822. return err
  1823. }
  1824. defer tx.Rollback()
  1825. if _, err := tx.Exec(migrate2To3UpdateQueries); err != nil {
  1826. return err
  1827. }
  1828. if _, err := tx.Exec(updateSchemaVersion, 3); err != nil {
  1829. return err
  1830. }
  1831. return tx.Commit()
  1832. }
  1833. func migrateFrom3(db *sql.DB) error {
  1834. log.Tag(tag).Info("Migrating user database schema: from 3 to 4")
  1835. tx, err := db.Begin()
  1836. if err != nil {
  1837. return err
  1838. }
  1839. defer tx.Rollback()
  1840. if _, err := tx.Exec(migrate3To4UpdateQueries); err != nil {
  1841. return err
  1842. }
  1843. if _, err := tx.Exec(updateSchemaVersion, 4); err != nil {
  1844. return err
  1845. }
  1846. return tx.Commit()
  1847. }
  1848. func migrateFrom4(db *sql.DB) error {
  1849. log.Tag(tag).Info("Migrating user database schema: from 4 to 5")
  1850. tx, err := db.Begin()
  1851. if err != nil {
  1852. return err
  1853. }
  1854. defer tx.Rollback()
  1855. if _, err := tx.Exec(migrate4To5UpdateQueries); err != nil {
  1856. return err
  1857. }
  1858. if _, err := tx.Exec(updateSchemaVersion, 5); err != nil {
  1859. return err
  1860. }
  1861. return tx.Commit()
  1862. }
  1863. func migrateFrom5(db *sql.DB) error {
  1864. log.Tag(tag).Info("Migrating user database schema: from 5 to 6")
  1865. tx, err := db.Begin()
  1866. if err != nil {
  1867. return err
  1868. }
  1869. defer tx.Rollback()
  1870. if _, err := tx.Exec(migrate5To6UpdateQueries); err != nil {
  1871. return err
  1872. }
  1873. if _, err := tx.Exec(updateSchemaVersion, 6); err != nil {
  1874. return err
  1875. }
  1876. return tx.Commit()
  1877. }
  1878. func nullString(s string) sql.NullString {
  1879. if s == "" {
  1880. return sql.NullString{}
  1881. }
  1882. return sql.NullString{String: s, Valid: true}
  1883. }
  1884. func nullInt64(v int64) sql.NullInt64 {
  1885. if v == 0 {
  1886. return sql.NullInt64{}
  1887. }
  1888. return sql.NullInt64{Int64: v, Valid: true}
  1889. }
  1890. // execTx executes a function in a transaction. If the function returns an error, the transaction is rolled back.
  1891. func execTx(db *sql.DB, f func(tx *sql.Tx) error) error {
  1892. tx, err := db.Begin()
  1893. if err != nil {
  1894. return err
  1895. }
  1896. defer tx.Rollback()
  1897. if err := f(tx); err != nil {
  1898. return err
  1899. }
  1900. return tx.Commit()
  1901. }
  1902. // queryTx executes a function in a transaction and returns the result. If the function
  1903. // returns an error, the transaction is rolled back.
  1904. func queryTx[T any](db *sql.DB, f func(tx *sql.Tx) (T, error)) (T, error) {
  1905. tx, err := db.Begin()
  1906. if err != nil {
  1907. var zero T
  1908. return zero, err
  1909. }
  1910. defer tx.Rollback()
  1911. t, err := f(tx)
  1912. if err != nil {
  1913. return t, err
  1914. }
  1915. if err := tx.Commit(); err != nil {
  1916. return t, err
  1917. }
  1918. return t, nil
  1919. }