manager.go 67 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961
  1. // Package user deals with authentication and authorization against topics
  2. package user
  3. import (
  4. "database/sql"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "github.com/mattn/go-sqlite3"
  9. "github.com/stripe/stripe-go/v74"
  10. "golang.org/x/crypto/bcrypt"
  11. "heckel.io/ntfy/v2/log"
  12. "heckel.io/ntfy/v2/util"
  13. "net/netip"
  14. "path/filepath"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. const (
  20. tierIDPrefix = "ti_"
  21. tierIDLength = 8
  22. syncTopicPrefix = "st_"
  23. syncTopicLength = 16
  24. userIDPrefix = "u_"
  25. userIDLength = 12
  26. userAuthIntentionalSlowDownHash = "$2a$10$YFCQvqQDwIIwnJM1xkAYOeih0dg17UVGanaTStnrSzC8NCWxcLDwy" // Cost should match DefaultUserPasswordBcryptCost
  27. userHardDeleteAfterDuration = 7 * 24 * time.Hour
  28. tokenPrefix = "tk_"
  29. tokenLength = 32
  30. tokenMaxCount = 60 // Only keep this many tokens in the table per user
  31. tag = "user_manager"
  32. )
  33. // Default constants that may be overridden by configs
  34. const (
  35. DefaultUserStatsQueueWriterInterval = 33 * time.Second
  36. DefaultUserPasswordBcryptCost = 10
  37. )
  38. var (
  39. errNoTokenProvided = errors.New("no token provided")
  40. errTopicOwnedByOthers = errors.New("topic owned by others")
  41. errNoRows = errors.New("no rows found")
  42. )
  43. // Manager-related queries
  44. const (
  45. createTablesQueries = `
  46. BEGIN;
  47. CREATE TABLE IF NOT EXISTS tier (
  48. id TEXT PRIMARY KEY,
  49. code TEXT NOT NULL,
  50. name TEXT NOT NULL,
  51. messages_limit INT NOT NULL,
  52. messages_expiry_duration INT NOT NULL,
  53. emails_limit INT NOT NULL,
  54. calls_limit INT NOT NULL,
  55. reservations_limit INT NOT NULL,
  56. attachment_file_size_limit INT NOT NULL,
  57. attachment_total_size_limit INT NOT NULL,
  58. attachment_expiry_duration INT NOT NULL,
  59. attachment_bandwidth_limit INT NOT NULL,
  60. stripe_monthly_price_id TEXT,
  61. stripe_yearly_price_id TEXT
  62. );
  63. CREATE UNIQUE INDEX idx_tier_code ON tier (code);
  64. CREATE UNIQUE INDEX idx_tier_stripe_monthly_price_id ON tier (stripe_monthly_price_id);
  65. CREATE UNIQUE INDEX idx_tier_stripe_yearly_price_id ON tier (stripe_yearly_price_id);
  66. CREATE TABLE IF NOT EXISTS user (
  67. id TEXT PRIMARY KEY,
  68. tier_id TEXT,
  69. user TEXT NOT NULL,
  70. pass TEXT NOT NULL,
  71. role TEXT CHECK (role IN ('anonymous', 'admin', 'user')) NOT NULL,
  72. prefs JSON NOT NULL DEFAULT '{}',
  73. sync_topic TEXT NOT NULL,
  74. provisioned INT NOT NULL,
  75. stats_messages INT NOT NULL DEFAULT (0),
  76. stats_emails INT NOT NULL DEFAULT (0),
  77. stats_calls INT NOT NULL DEFAULT (0),
  78. stripe_customer_id TEXT,
  79. stripe_subscription_id TEXT,
  80. stripe_subscription_status TEXT,
  81. stripe_subscription_interval TEXT,
  82. stripe_subscription_paid_until INT,
  83. stripe_subscription_cancel_at INT,
  84. created INT NOT NULL,
  85. deleted INT,
  86. FOREIGN KEY (tier_id) REFERENCES tier (id)
  87. );
  88. CREATE UNIQUE INDEX idx_user ON user (user);
  89. CREATE UNIQUE INDEX idx_user_stripe_customer_id ON user (stripe_customer_id);
  90. CREATE UNIQUE INDEX idx_user_stripe_subscription_id ON user (stripe_subscription_id);
  91. CREATE TABLE IF NOT EXISTS user_access (
  92. user_id TEXT NOT NULL,
  93. topic TEXT NOT NULL,
  94. read INT NOT NULL,
  95. write INT NOT NULL,
  96. owner_user_id INT,
  97. provisioned INT NOT NULL,
  98. PRIMARY KEY (user_id, topic),
  99. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE,
  100. FOREIGN KEY (owner_user_id) REFERENCES user (id) ON DELETE CASCADE
  101. );
  102. CREATE TABLE IF NOT EXISTS user_token (
  103. user_id TEXT NOT NULL,
  104. token TEXT NOT NULL,
  105. label TEXT NOT NULL,
  106. last_access INT NOT NULL,
  107. last_origin TEXT NOT NULL,
  108. expires INT NOT NULL,
  109. PRIMARY KEY (user_id, token),
  110. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  111. );
  112. CREATE TABLE IF NOT EXISTS user_phone (
  113. user_id TEXT NOT NULL,
  114. phone_number TEXT NOT NULL,
  115. PRIMARY KEY (user_id, phone_number),
  116. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  117. );
  118. CREATE TABLE IF NOT EXISTS schemaVersion (
  119. id INT PRIMARY KEY,
  120. version INT NOT NULL
  121. );
  122. INSERT INTO user (id, user, pass, role, sync_topic, provisioned, created)
  123. VALUES ('` + everyoneID + `', '*', '', 'anonymous', '', false, UNIXEPOCH())
  124. ON CONFLICT (id) DO NOTHING;
  125. COMMIT;
  126. `
  127. builtinStartupQueries = `
  128. PRAGMA foreign_keys = ON;
  129. `
  130. selectUserByIDQuery = `
  131. SELECT u.id, u.user, u.pass, u.role, u.prefs, u.sync_topic, u.provisioned, u.stats_messages, u.stats_emails, u.stats_calls, u.stripe_customer_id, u.stripe_subscription_id, u.stripe_subscription_status, u.stripe_subscription_interval, u.stripe_subscription_paid_until, u.stripe_subscription_cancel_at, deleted, t.id, t.code, t.name, t.messages_limit, t.messages_expiry_duration, t.emails_limit, t.calls_limit, t.reservations_limit, t.attachment_file_size_limit, t.attachment_total_size_limit, t.attachment_expiry_duration, t.attachment_bandwidth_limit, t.stripe_monthly_price_id, t.stripe_yearly_price_id
  132. FROM user u
  133. LEFT JOIN tier t on t.id = u.tier_id
  134. WHERE u.id = ?
  135. `
  136. selectUserByNameQuery = `
  137. SELECT u.id, u.user, u.pass, u.role, u.prefs, u.sync_topic, u.provisioned, u.stats_messages, u.stats_emails, u.stats_calls, u.stripe_customer_id, u.stripe_subscription_id, u.stripe_subscription_status, u.stripe_subscription_interval, u.stripe_subscription_paid_until, u.stripe_subscription_cancel_at, deleted, t.id, t.code, t.name, t.messages_limit, t.messages_expiry_duration, t.emails_limit, t.calls_limit, t.reservations_limit, t.attachment_file_size_limit, t.attachment_total_size_limit, t.attachment_expiry_duration, t.attachment_bandwidth_limit, t.stripe_monthly_price_id, t.stripe_yearly_price_id
  138. FROM user u
  139. LEFT JOIN tier t on t.id = u.tier_id
  140. WHERE user = ?
  141. `
  142. selectUserByTokenQuery = `
  143. SELECT u.id, u.user, u.pass, u.role, u.prefs, u.sync_topic, u.provisioned, u.stats_messages, u.stats_emails, u.stats_calls, u.stripe_customer_id, u.stripe_subscription_id, u.stripe_subscription_status, u.stripe_subscription_interval, u.stripe_subscription_paid_until, u.stripe_subscription_cancel_at, deleted, t.id, t.code, t.name, t.messages_limit, t.messages_expiry_duration, t.emails_limit, t.calls_limit, t.reservations_limit, t.attachment_file_size_limit, t.attachment_total_size_limit, t.attachment_expiry_duration, t.attachment_bandwidth_limit, t.stripe_monthly_price_id, t.stripe_yearly_price_id
  144. FROM user u
  145. JOIN user_token tk on u.id = tk.user_id
  146. LEFT JOIN tier t on t.id = u.tier_id
  147. WHERE tk.token = ? AND (tk.expires = 0 OR tk.expires >= ?)
  148. `
  149. selectUserByStripeCustomerIDQuery = `
  150. SELECT u.id, u.user, u.pass, u.role, u.prefs, u.sync_topic, u.provisioned, u.stats_messages, u.stats_emails, u.stats_calls, u.stripe_customer_id, u.stripe_subscription_id, u.stripe_subscription_status, u.stripe_subscription_interval, u.stripe_subscription_paid_until, u.stripe_subscription_cancel_at, deleted, t.id, t.code, t.name, t.messages_limit, t.messages_expiry_duration, t.emails_limit, t.calls_limit, t.reservations_limit, t.attachment_file_size_limit, t.attachment_total_size_limit, t.attachment_expiry_duration, t.attachment_bandwidth_limit, t.stripe_monthly_price_id, t.stripe_yearly_price_id
  151. FROM user u
  152. LEFT JOIN tier t on t.id = u.tier_id
  153. WHERE u.stripe_customer_id = ?
  154. `
  155. selectTopicPermsQuery = `
  156. SELECT read, write
  157. FROM user_access a
  158. JOIN user u ON u.id = a.user_id
  159. WHERE (u.user = ? OR u.user = ?) AND ? LIKE a.topic ESCAPE '\'
  160. ORDER BY u.user DESC, LENGTH(a.topic) DESC, a.write DESC
  161. `
  162. insertUserQuery = `
  163. INSERT INTO user (id, user, pass, role, sync_topic, provisioned, created)
  164. VALUES (?, ?, ?, ?, ?, ?, ?)
  165. `
  166. selectUsernamesQuery = `
  167. SELECT user
  168. FROM user
  169. ORDER BY
  170. CASE role
  171. WHEN 'admin' THEN 1
  172. WHEN 'anonymous' THEN 3
  173. ELSE 2
  174. END, user
  175. `
  176. selectUserCountQuery = `SELECT COUNT(*) FROM user`
  177. updateUserPassQuery = `UPDATE user SET pass = ? WHERE user = ?`
  178. updateUserRoleQuery = `UPDATE user SET role = ? WHERE user = ?`
  179. updateUserProvisionedQuery = `UPDATE user SET provisioned = ? WHERE user = ?`
  180. updateUserPrefsQuery = `UPDATE user SET prefs = ? WHERE id = ?`
  181. updateUserStatsQuery = `UPDATE user SET stats_messages = ?, stats_emails = ?, stats_calls = ? WHERE id = ?`
  182. updateUserStatsResetAllQuery = `UPDATE user SET stats_messages = 0, stats_emails = 0, stats_calls = 0`
  183. updateUserDeletedQuery = `UPDATE user SET deleted = ? WHERE id = ?`
  184. deleteUsersMarkedQuery = `DELETE FROM user WHERE deleted < ?`
  185. deleteUserQuery = `DELETE FROM user WHERE user = ?`
  186. upsertUserAccessQuery = `
  187. INSERT INTO user_access (user_id, topic, read, write, owner_user_id, provisioned)
  188. VALUES ((SELECT id FROM user WHERE user = ?), ?, ?, ?, (SELECT IIF(?='',NULL,(SELECT id FROM user WHERE user=?))), ?)
  189. ON CONFLICT (user_id, topic)
  190. DO UPDATE SET read=excluded.read, write=excluded.write, owner_user_id=excluded.owner_user_id, provisioned=excluded.provisioned
  191. `
  192. selectUserAllAccessQuery = `
  193. SELECT user_id, topic, read, write, provisioned
  194. FROM user_access
  195. ORDER BY LENGTH(topic) DESC, write DESC, read DESC, topic
  196. `
  197. selectUserAccessQuery = `
  198. SELECT topic, read, write, provisioned
  199. FROM user_access
  200. WHERE user_id = (SELECT id FROM user WHERE user = ?)
  201. ORDER BY LENGTH(topic) DESC, write DESC, read DESC, topic
  202. `
  203. selectUserReservationsQuery = `
  204. SELECT a_user.topic, a_user.read, a_user.write, a_everyone.read AS everyone_read, a_everyone.write AS everyone_write
  205. FROM user_access a_user
  206. LEFT JOIN user_access a_everyone ON a_user.topic = a_everyone.topic AND a_everyone.user_id = (SELECT id FROM user WHERE user = ?)
  207. WHERE a_user.user_id = a_user.owner_user_id
  208. AND a_user.owner_user_id = (SELECT id FROM user WHERE user = ?)
  209. ORDER BY a_user.topic
  210. `
  211. selectUserReservationsCountQuery = `
  212. SELECT COUNT(*)
  213. FROM user_access
  214. WHERE user_id = owner_user_id
  215. AND owner_user_id = (SELECT id FROM user WHERE user = ?)
  216. `
  217. selectUserReservationsOwnerQuery = `
  218. SELECT owner_user_id
  219. FROM user_access
  220. WHERE topic = ?
  221. AND user_id = owner_user_id
  222. `
  223. selectUserHasReservationQuery = `
  224. SELECT COUNT(*)
  225. FROM user_access
  226. WHERE user_id = owner_user_id
  227. AND owner_user_id = (SELECT id FROM user WHERE user = ?)
  228. AND topic = ?
  229. `
  230. selectOtherAccessCountQuery = `
  231. SELECT COUNT(*)
  232. FROM user_access
  233. WHERE (topic = ? OR ? LIKE topic ESCAPE '\')
  234. AND (owner_user_id IS NULL OR owner_user_id != (SELECT id FROM user WHERE user = ?))
  235. `
  236. deleteAllAccessQuery = `DELETE FROM user_access`
  237. deleteUserAccessQuery = `
  238. DELETE FROM user_access
  239. WHERE user_id = (SELECT id FROM user WHERE user = ?)
  240. OR owner_user_id = (SELECT id FROM user WHERE user = ?)
  241. `
  242. deleteUserAccessProvisionedQuery = `DELETE FROM user_access WHERE provisioned = 1`
  243. deleteTopicAccessQuery = `
  244. DELETE FROM user_access
  245. WHERE (user_id = (SELECT id FROM user WHERE user = ?) OR owner_user_id = (SELECT id FROM user WHERE user = ?))
  246. AND topic = ?
  247. `
  248. selectTokenCountQuery = `SELECT COUNT(*) FROM user_token WHERE user_id = ?`
  249. selectTokensQuery = `SELECT token, label, last_access, last_origin, expires FROM user_token WHERE user_id = ?`
  250. selectTokenQuery = `SELECT token, label, last_access, last_origin, expires FROM user_token WHERE user_id = ? AND token = ?`
  251. insertTokenQuery = `INSERT INTO user_token (user_id, token, label, last_access, last_origin, expires) VALUES (?, ?, ?, ?, ?, ?)`
  252. updateTokenExpiryQuery = `UPDATE user_token SET expires = ? WHERE user_id = ? AND token = ?`
  253. updateTokenLabelQuery = `UPDATE user_token SET label = ? WHERE user_id = ? AND token = ?`
  254. updateTokenLastAccessQuery = `UPDATE user_token SET last_access = ?, last_origin = ? WHERE token = ?`
  255. deleteTokenQuery = `DELETE FROM user_token WHERE user_id = ? AND token = ?`
  256. deleteAllTokenQuery = `DELETE FROM user_token WHERE user_id = ?`
  257. deleteExpiredTokensQuery = `DELETE FROM user_token WHERE expires > 0 AND expires < ?`
  258. deleteExcessTokensQuery = `
  259. DELETE FROM user_token
  260. WHERE user_id = ?
  261. AND (user_id, token) NOT IN (
  262. SELECT user_id, token
  263. FROM user_token
  264. WHERE user_id = ?
  265. ORDER BY expires DESC
  266. LIMIT ?
  267. )
  268. `
  269. selectPhoneNumbersQuery = `SELECT phone_number FROM user_phone WHERE user_id = ?`
  270. insertPhoneNumberQuery = `INSERT INTO user_phone (user_id, phone_number) VALUES (?, ?)`
  271. deletePhoneNumberQuery = `DELETE FROM user_phone WHERE user_id = ? AND phone_number = ?`
  272. insertTierQuery = `
  273. INSERT INTO tier (id, code, name, messages_limit, messages_expiry_duration, emails_limit, calls_limit, reservations_limit, attachment_file_size_limit, attachment_total_size_limit, attachment_expiry_duration, attachment_bandwidth_limit, stripe_monthly_price_id, stripe_yearly_price_id)
  274. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  275. `
  276. updateTierQuery = `
  277. UPDATE tier
  278. SET name = ?, messages_limit = ?, messages_expiry_duration = ?, emails_limit = ?, calls_limit = ?, reservations_limit = ?, attachment_file_size_limit = ?, attachment_total_size_limit = ?, attachment_expiry_duration = ?, attachment_bandwidth_limit = ?, stripe_monthly_price_id = ?, stripe_yearly_price_id = ?
  279. WHERE code = ?
  280. `
  281. selectTiersQuery = `
  282. SELECT id, code, name, messages_limit, messages_expiry_duration, emails_limit, calls_limit, reservations_limit, attachment_file_size_limit, attachment_total_size_limit, attachment_expiry_duration, attachment_bandwidth_limit, stripe_monthly_price_id, stripe_yearly_price_id
  283. FROM tier
  284. `
  285. selectTierByCodeQuery = `
  286. SELECT id, code, name, messages_limit, messages_expiry_duration, emails_limit, calls_limit, reservations_limit, attachment_file_size_limit, attachment_total_size_limit, attachment_expiry_duration, attachment_bandwidth_limit, stripe_monthly_price_id, stripe_yearly_price_id
  287. FROM tier
  288. WHERE code = ?
  289. `
  290. selectTierByPriceIDQuery = `
  291. SELECT id, code, name, messages_limit, messages_expiry_duration, emails_limit, calls_limit, reservations_limit, attachment_file_size_limit, attachment_total_size_limit, attachment_expiry_duration, attachment_bandwidth_limit, stripe_monthly_price_id, stripe_yearly_price_id
  292. FROM tier
  293. WHERE (stripe_monthly_price_id = ? OR stripe_yearly_price_id = ?)
  294. `
  295. updateUserTierQuery = `UPDATE user SET tier_id = (SELECT id FROM tier WHERE code = ?) WHERE user = ?`
  296. deleteUserTierQuery = `UPDATE user SET tier_id = null WHERE user = ?`
  297. deleteTierQuery = `DELETE FROM tier WHERE code = ?`
  298. updateBillingQuery = `
  299. UPDATE user
  300. SET stripe_customer_id = ?, stripe_subscription_id = ?, stripe_subscription_status = ?, stripe_subscription_interval = ?, stripe_subscription_paid_until = ?, stripe_subscription_cancel_at = ?
  301. WHERE user = ?
  302. `
  303. )
  304. // Schema management queries
  305. const (
  306. currentSchemaVersion = 6
  307. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  308. updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
  309. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  310. // 1 -> 2 (complex migration!)
  311. migrate1To2CreateTablesQueries = `
  312. ALTER TABLE user RENAME TO user_old;
  313. CREATE TABLE IF NOT EXISTS tier (
  314. id TEXT PRIMARY KEY,
  315. code TEXT NOT NULL,
  316. name TEXT NOT NULL,
  317. messages_limit INT NOT NULL,
  318. messages_expiry_duration INT NOT NULL,
  319. emails_limit INT NOT NULL,
  320. reservations_limit INT NOT NULL,
  321. attachment_file_size_limit INT NOT NULL,
  322. attachment_total_size_limit INT NOT NULL,
  323. attachment_expiry_duration INT NOT NULL,
  324. attachment_bandwidth_limit INT NOT NULL,
  325. stripe_price_id TEXT
  326. );
  327. CREATE UNIQUE INDEX idx_tier_code ON tier (code);
  328. CREATE UNIQUE INDEX idx_tier_price_id ON tier (stripe_price_id);
  329. CREATE TABLE IF NOT EXISTS user (
  330. id TEXT PRIMARY KEY,
  331. tier_id TEXT,
  332. user TEXT NOT NULL,
  333. pass TEXT NOT NULL,
  334. role TEXT CHECK (role IN ('anonymous', 'admin', 'user')) NOT NULL,
  335. prefs JSON NOT NULL DEFAULT '{}',
  336. sync_topic TEXT NOT NULL,
  337. stats_messages INT NOT NULL DEFAULT (0),
  338. stats_emails INT NOT NULL DEFAULT (0),
  339. stripe_customer_id TEXT,
  340. stripe_subscription_id TEXT,
  341. stripe_subscription_status TEXT,
  342. stripe_subscription_paid_until INT,
  343. stripe_subscription_cancel_at INT,
  344. created INT NOT NULL,
  345. deleted INT,
  346. FOREIGN KEY (tier_id) REFERENCES tier (id)
  347. );
  348. CREATE UNIQUE INDEX idx_user ON user (user);
  349. CREATE UNIQUE INDEX idx_user_stripe_customer_id ON user (stripe_customer_id);
  350. CREATE UNIQUE INDEX idx_user_stripe_subscription_id ON user (stripe_subscription_id);
  351. CREATE TABLE IF NOT EXISTS user_access (
  352. user_id TEXT NOT NULL,
  353. topic TEXT NOT NULL,
  354. read INT NOT NULL,
  355. write INT NOT NULL,
  356. owner_user_id INT,
  357. PRIMARY KEY (user_id, topic),
  358. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE,
  359. FOREIGN KEY (owner_user_id) REFERENCES user (id) ON DELETE CASCADE
  360. );
  361. CREATE TABLE IF NOT EXISTS user_token (
  362. user_id TEXT NOT NULL,
  363. token TEXT NOT NULL,
  364. label TEXT NOT NULL,
  365. last_access INT NOT NULL,
  366. last_origin TEXT NOT NULL,
  367. expires INT NOT NULL,
  368. PRIMARY KEY (user_id, token),
  369. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  370. );
  371. CREATE TABLE IF NOT EXISTS schemaVersion (
  372. id INT PRIMARY KEY,
  373. version INT NOT NULL
  374. );
  375. INSERT INTO user (id, user, pass, role, sync_topic, created)
  376. VALUES ('u_everyone', '*', '', 'anonymous', '', UNIXEPOCH())
  377. ON CONFLICT (id) DO NOTHING;
  378. `
  379. migrate1To2SelectAllOldUsernamesNoTx = `SELECT user FROM user_old`
  380. migrate1To2InsertUserNoTx = `
  381. INSERT INTO user (id, user, pass, role, sync_topic, created)
  382. SELECT ?, user, pass, role, ?, UNIXEPOCH() FROM user_old WHERE user = ?
  383. `
  384. migrate1To2InsertFromOldTablesAndDropNoTx = `
  385. INSERT INTO user_access (user_id, topic, read, write)
  386. SELECT u.id, a.topic, a.read, a.write
  387. FROM user u
  388. JOIN access a ON u.user = a.user;
  389. DROP TABLE access;
  390. DROP TABLE user_old;
  391. `
  392. // 2 -> 3
  393. migrate2To3UpdateQueries = `
  394. ALTER TABLE user ADD COLUMN stripe_subscription_interval TEXT;
  395. ALTER TABLE tier RENAME COLUMN stripe_price_id TO stripe_monthly_price_id;
  396. ALTER TABLE tier ADD COLUMN stripe_yearly_price_id TEXT;
  397. DROP INDEX IF EXISTS idx_tier_price_id;
  398. CREATE UNIQUE INDEX idx_tier_stripe_monthly_price_id ON tier (stripe_monthly_price_id);
  399. CREATE UNIQUE INDEX idx_tier_stripe_yearly_price_id ON tier (stripe_yearly_price_id);
  400. `
  401. // 3 -> 4
  402. migrate3To4UpdateQueries = `
  403. ALTER TABLE tier ADD COLUMN calls_limit INT NOT NULL DEFAULT (0);
  404. ALTER TABLE user ADD COLUMN stats_calls INT NOT NULL DEFAULT (0);
  405. CREATE TABLE IF NOT EXISTS user_phone (
  406. user_id TEXT NOT NULL,
  407. phone_number TEXT NOT NULL,
  408. PRIMARY KEY (user_id, phone_number),
  409. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE
  410. );
  411. `
  412. // 4 -> 5
  413. migrate4To5UpdateQueries = `
  414. UPDATE user_access SET topic = REPLACE(topic, '_', '\_');
  415. `
  416. // 5 -> 6
  417. migrate5To6UpdateQueries = `
  418. PRAGMA foreign_keys=off;
  419. -- Alter user table: Add provisioned column
  420. ALTER TABLE user RENAME TO user_old;
  421. CREATE TABLE IF NOT EXISTS user (
  422. id TEXT PRIMARY KEY,
  423. tier_id TEXT,
  424. user TEXT NOT NULL,
  425. pass TEXT NOT NULL,
  426. role TEXT CHECK (role IN ('anonymous', 'admin', 'user')) NOT NULL,
  427. prefs JSON NOT NULL DEFAULT '{}',
  428. sync_topic TEXT NOT NULL,
  429. provisioned INT NOT NULL,
  430. stats_messages INT NOT NULL DEFAULT (0),
  431. stats_emails INT NOT NULL DEFAULT (0),
  432. stats_calls INT NOT NULL DEFAULT (0),
  433. stripe_customer_id TEXT,
  434. stripe_subscription_id TEXT,
  435. stripe_subscription_status TEXT,
  436. stripe_subscription_interval TEXT,
  437. stripe_subscription_paid_until INT,
  438. stripe_subscription_cancel_at INT,
  439. created INT NOT NULL,
  440. deleted INT,
  441. FOREIGN KEY (tier_id) REFERENCES tier (id)
  442. );
  443. INSERT INTO user
  444. SELECT
  445. id,
  446. tier_id,
  447. user,
  448. pass,
  449. role,
  450. prefs,
  451. sync_topic,
  452. 0,
  453. stats_messages,
  454. stats_emails,
  455. stats_calls,
  456. stripe_customer_id,
  457. stripe_subscription_id,
  458. stripe_subscription_status,
  459. stripe_subscription_interval,
  460. stripe_subscription_paid_until,
  461. stripe_subscription_cancel_at,
  462. created, deleted
  463. FROM user_old;
  464. DROP TABLE user_old;
  465. -- Alter user_access table: Add provisioned column
  466. ALTER TABLE user_access RENAME TO user_access_old;
  467. CREATE TABLE user_access (
  468. user_id TEXT NOT NULL,
  469. topic TEXT NOT NULL,
  470. read INT NOT NULL,
  471. write INT NOT NULL,
  472. owner_user_id INT,
  473. provisioned INTEGER NOT NULL,
  474. PRIMARY KEY (user_id, topic),
  475. FOREIGN KEY (user_id) REFERENCES user (id) ON DELETE CASCADE,
  476. FOREIGN KEY (owner_user_id) REFERENCES user (id) ON DELETE CASCADE
  477. );
  478. INSERT INTO user_access SELECT *, 0 FROM user_access_old;
  479. DROP TABLE user_access_old;
  480. -- Recreate indices
  481. CREATE UNIQUE INDEX idx_user ON user (user);
  482. CREATE UNIQUE INDEX idx_user_stripe_customer_id ON user (stripe_customer_id);
  483. CREATE UNIQUE INDEX idx_user_stripe_subscription_id ON user (stripe_subscription_id);
  484. -- Re-enable foreign keys
  485. PRAGMA foreign_keys=on;
  486. `
  487. )
  488. var (
  489. migrations = map[int]func(db *sql.DB) error{
  490. 1: migrateFrom1,
  491. 2: migrateFrom2,
  492. 3: migrateFrom3,
  493. 4: migrateFrom4,
  494. 5: migrateFrom5,
  495. }
  496. )
  497. // Manager is an implementation of Manager. It stores users and access control list
  498. // in a SQLite database.
  499. type Manager struct {
  500. config *Config
  501. db *sql.DB
  502. statsQueue map[string]*Stats // "Queue" to asynchronously write user stats to the database (UserID -> Stats)
  503. tokenQueue map[string]*TokenUpdate // "Queue" to asynchronously write token access stats to the database (Token ID -> TokenUpdate)
  504. mu sync.Mutex
  505. }
  506. // Config holds the configuration for the user Manager
  507. type Config struct {
  508. Filename string // Database filename, e.g. "/var/lib/ntfy/user.db"
  509. StartupQueries string // Queries to run on startup, e.g. to create initial users or tiers
  510. DefaultAccess Permission // Default permission if no ACL matches
  511. ProvisionEnabled bool // Enable auto-provisioning of users and access grants, disabled for "ntfy user" commands
  512. Users []*User // Predefined users to create on startup
  513. Access map[string][]*Grant // Predefined access grants to create on startup
  514. QueueWriterInterval time.Duration // Interval for the async queue writer to flush stats and token updates to the database
  515. BcryptCost int // Cost of generated passwords; lowering makes testing faster
  516. }
  517. var _ Auther = (*Manager)(nil)
  518. // NewManager creates a new Manager instance
  519. func NewManager(config *Config) (*Manager, error) {
  520. // Set defaults
  521. if config.BcryptCost <= 0 {
  522. config.BcryptCost = DefaultUserPasswordBcryptCost
  523. }
  524. if config.QueueWriterInterval.Seconds() <= 0 {
  525. config.QueueWriterInterval = DefaultUserStatsQueueWriterInterval
  526. }
  527. // Check the parent directory of the database file (makes for friendly error messages)
  528. parentDir := filepath.Dir(config.Filename)
  529. if !util.FileExists(parentDir) {
  530. return nil, fmt.Errorf("user database directory %s does not exist or is not accessible", parentDir)
  531. }
  532. // Open DB and run setup queries
  533. db, err := sql.Open("sqlite3", config.Filename)
  534. if err != nil {
  535. return nil, err
  536. }
  537. if err := setupDB(db); err != nil {
  538. return nil, err
  539. }
  540. if err := runStartupQueries(db, config.StartupQueries); err != nil {
  541. return nil, err
  542. }
  543. manager := &Manager{
  544. db: db,
  545. config: config,
  546. statsQueue: make(map[string]*Stats),
  547. tokenQueue: make(map[string]*TokenUpdate),
  548. }
  549. if err := manager.maybeProvisionUsersAndAccess(); err != nil {
  550. return nil, err
  551. }
  552. go manager.asyncQueueWriter(config.QueueWriterInterval)
  553. return manager, nil
  554. }
  555. // Authenticate checks username and password and returns a User if correct, and the user has not been
  556. // marked as deleted. The method returns in constant-ish time, regardless of whether the user exists or
  557. // the password is correct or incorrect.
  558. func (a *Manager) Authenticate(username, password string) (*User, error) {
  559. if username == Everyone {
  560. return nil, ErrUnauthenticated
  561. }
  562. user, err := a.User(username)
  563. if err != nil {
  564. log.Tag(tag).Field("user_name", username).Err(err).Trace("Authentication of user failed (1)")
  565. bcrypt.CompareHashAndPassword([]byte(userAuthIntentionalSlowDownHash), []byte("intentional slow-down to avoid timing attacks"))
  566. return nil, ErrUnauthenticated
  567. } else if user.Deleted {
  568. log.Tag(tag).Field("user_name", username).Trace("Authentication of user failed (2): user marked deleted")
  569. bcrypt.CompareHashAndPassword([]byte(userAuthIntentionalSlowDownHash), []byte("intentional slow-down to avoid timing attacks"))
  570. return nil, ErrUnauthenticated
  571. } else if err := bcrypt.CompareHashAndPassword([]byte(user.Hash), []byte(password)); err != nil {
  572. log.Tag(tag).Field("user_name", username).Err(err).Trace("Authentication of user failed (3)")
  573. return nil, ErrUnauthenticated
  574. }
  575. return user, nil
  576. }
  577. // AuthenticateToken checks if the token exists and returns the associated User if it does.
  578. // The method sets the User.Token value to the token that was used for authentication.
  579. func (a *Manager) AuthenticateToken(token string) (*User, error) {
  580. if len(token) != tokenLength {
  581. return nil, ErrUnauthenticated
  582. }
  583. user, err := a.userByToken(token)
  584. if err != nil {
  585. log.Tag(tag).Field("token", token).Err(err).Trace("Authentication of token failed")
  586. return nil, ErrUnauthenticated
  587. }
  588. user.Token = token
  589. return user, nil
  590. }
  591. // CreateToken generates a random token for the given user and returns it. The token expires
  592. // after a fixed duration unless ChangeToken is called. This function also prunes tokens for the
  593. // given user, if there are too many of them.
  594. func (a *Manager) CreateToken(userID, label string, expires time.Time, origin netip.Addr) (*Token, error) {
  595. token := util.RandomLowerStringPrefix(tokenPrefix, tokenLength) // Lowercase only to support "<topic>+<token>@<domain>" email addresses
  596. tx, err := a.db.Begin()
  597. if err != nil {
  598. return nil, err
  599. }
  600. defer tx.Rollback()
  601. access := time.Now()
  602. if _, err := tx.Exec(insertTokenQuery, userID, token, label, access.Unix(), origin.String(), expires.Unix()); err != nil {
  603. return nil, err
  604. }
  605. rows, err := tx.Query(selectTokenCountQuery, userID)
  606. if err != nil {
  607. return nil, err
  608. }
  609. defer rows.Close()
  610. if !rows.Next() {
  611. return nil, errNoRows
  612. }
  613. var tokenCount int
  614. if err := rows.Scan(&tokenCount); err != nil {
  615. return nil, err
  616. }
  617. if tokenCount >= tokenMaxCount {
  618. // This pruning logic is done in two queries for efficiency. The SELECT above is a lookup
  619. // on two indices, whereas the query below is a full table scan.
  620. if _, err := tx.Exec(deleteExcessTokensQuery, userID, userID, tokenMaxCount); err != nil {
  621. return nil, err
  622. }
  623. }
  624. if err := tx.Commit(); err != nil {
  625. return nil, err
  626. }
  627. return &Token{
  628. Value: token,
  629. Label: label,
  630. LastAccess: access,
  631. LastOrigin: origin,
  632. Expires: expires,
  633. }, nil
  634. }
  635. // Tokens returns all existing tokens for the user with the given user ID
  636. func (a *Manager) Tokens(userID string) ([]*Token, error) {
  637. rows, err := a.db.Query(selectTokensQuery, userID)
  638. if err != nil {
  639. return nil, err
  640. }
  641. defer rows.Close()
  642. tokens := make([]*Token, 0)
  643. for {
  644. token, err := a.readToken(rows)
  645. if errors.Is(err, ErrTokenNotFound) {
  646. break
  647. } else if err != nil {
  648. return nil, err
  649. }
  650. tokens = append(tokens, token)
  651. }
  652. return tokens, nil
  653. }
  654. // Token returns a specific token for a user
  655. func (a *Manager) Token(userID, token string) (*Token, error) {
  656. rows, err := a.db.Query(selectTokenQuery, userID, token)
  657. if err != nil {
  658. return nil, err
  659. }
  660. defer rows.Close()
  661. return a.readToken(rows)
  662. }
  663. func (a *Manager) readToken(rows *sql.Rows) (*Token, error) {
  664. var token, label, lastOrigin string
  665. var lastAccess, expires int64
  666. if !rows.Next() {
  667. return nil, ErrTokenNotFound
  668. }
  669. if err := rows.Scan(&token, &label, &lastAccess, &lastOrigin, &expires); err != nil {
  670. return nil, err
  671. } else if err := rows.Err(); err != nil {
  672. return nil, err
  673. }
  674. lastOriginIP, err := netip.ParseAddr(lastOrigin)
  675. if err != nil {
  676. lastOriginIP = netip.IPv4Unspecified()
  677. }
  678. return &Token{
  679. Value: token,
  680. Label: label,
  681. LastAccess: time.Unix(lastAccess, 0),
  682. LastOrigin: lastOriginIP,
  683. Expires: time.Unix(expires, 0),
  684. }, nil
  685. }
  686. // ChangeToken updates a token's label and/or expiry date
  687. func (a *Manager) ChangeToken(userID, token string, label *string, expires *time.Time) (*Token, error) {
  688. if token == "" {
  689. return nil, errNoTokenProvided
  690. }
  691. tx, err := a.db.Begin()
  692. if err != nil {
  693. return nil, err
  694. }
  695. defer tx.Rollback()
  696. if label != nil {
  697. if _, err := tx.Exec(updateTokenLabelQuery, *label, userID, token); err != nil {
  698. return nil, err
  699. }
  700. }
  701. if expires != nil {
  702. if _, err := tx.Exec(updateTokenExpiryQuery, expires.Unix(), userID, token); err != nil {
  703. return nil, err
  704. }
  705. }
  706. if err := tx.Commit(); err != nil {
  707. return nil, err
  708. }
  709. return a.Token(userID, token)
  710. }
  711. // RemoveToken deletes the token defined in User.Token
  712. func (a *Manager) RemoveToken(userID, token string) error {
  713. if token == "" {
  714. return errNoTokenProvided
  715. }
  716. if _, err := a.db.Exec(deleteTokenQuery, userID, token); err != nil {
  717. return err
  718. }
  719. return nil
  720. }
  721. // RemoveExpiredTokens deletes all expired tokens from the database
  722. func (a *Manager) RemoveExpiredTokens() error {
  723. if _, err := a.db.Exec(deleteExpiredTokensQuery, time.Now().Unix()); err != nil {
  724. return err
  725. }
  726. return nil
  727. }
  728. // PhoneNumbers returns all phone numbers for the user with the given user ID
  729. func (a *Manager) PhoneNumbers(userID string) ([]string, error) {
  730. rows, err := a.db.Query(selectPhoneNumbersQuery, userID)
  731. if err != nil {
  732. return nil, err
  733. }
  734. defer rows.Close()
  735. phoneNumbers := make([]string, 0)
  736. for {
  737. phoneNumber, err := a.readPhoneNumber(rows)
  738. if err == ErrPhoneNumberNotFound {
  739. break
  740. } else if err != nil {
  741. return nil, err
  742. }
  743. phoneNumbers = append(phoneNumbers, phoneNumber)
  744. }
  745. return phoneNumbers, nil
  746. }
  747. func (a *Manager) readPhoneNumber(rows *sql.Rows) (string, error) {
  748. var phoneNumber string
  749. if !rows.Next() {
  750. return "", ErrPhoneNumberNotFound
  751. }
  752. if err := rows.Scan(&phoneNumber); err != nil {
  753. return "", err
  754. } else if err := rows.Err(); err != nil {
  755. return "", err
  756. }
  757. return phoneNumber, nil
  758. }
  759. // AddPhoneNumber adds a phone number to the user with the given user ID
  760. func (a *Manager) AddPhoneNumber(userID string, phoneNumber string) error {
  761. if _, err := a.db.Exec(insertPhoneNumberQuery, userID, phoneNumber); err != nil {
  762. if sqliteErr, ok := err.(sqlite3.Error); ok && sqliteErr.ExtendedCode == sqlite3.ErrConstraintUnique {
  763. return ErrPhoneNumberExists
  764. }
  765. return err
  766. }
  767. return nil
  768. }
  769. // RemovePhoneNumber deletes a phone number from the user with the given user ID
  770. func (a *Manager) RemovePhoneNumber(userID string, phoneNumber string) error {
  771. _, err := a.db.Exec(deletePhoneNumberQuery, userID, phoneNumber)
  772. return err
  773. }
  774. // RemoveDeletedUsers deletes all users that have been marked deleted for
  775. func (a *Manager) RemoveDeletedUsers() error {
  776. if _, err := a.db.Exec(deleteUsersMarkedQuery, time.Now().Unix()); err != nil {
  777. return err
  778. }
  779. return nil
  780. }
  781. // ChangeSettings persists the user settings
  782. func (a *Manager) ChangeSettings(userID string, prefs *Prefs) error {
  783. b, err := json.Marshal(prefs)
  784. if err != nil {
  785. return err
  786. }
  787. if _, err := a.db.Exec(updateUserPrefsQuery, string(b), userID); err != nil {
  788. return err
  789. }
  790. return nil
  791. }
  792. // ResetStats resets all user stats in the user database. This touches all users.
  793. func (a *Manager) ResetStats() error {
  794. a.mu.Lock() // Includes database query to avoid races!
  795. defer a.mu.Unlock()
  796. if _, err := a.db.Exec(updateUserStatsResetAllQuery); err != nil {
  797. return err
  798. }
  799. a.statsQueue = make(map[string]*Stats)
  800. return nil
  801. }
  802. // EnqueueUserStats adds the user to a queue which writes out user stats (messages, emails, ..) in
  803. // batches at a regular interval
  804. func (a *Manager) EnqueueUserStats(userID string, stats *Stats) {
  805. a.mu.Lock()
  806. defer a.mu.Unlock()
  807. a.statsQueue[userID] = stats
  808. }
  809. // EnqueueTokenUpdate adds the token update to a queue which writes out token access times
  810. // in batches at a regular interval
  811. func (a *Manager) EnqueueTokenUpdate(tokenID string, update *TokenUpdate) {
  812. a.mu.Lock()
  813. defer a.mu.Unlock()
  814. a.tokenQueue[tokenID] = update
  815. }
  816. func (a *Manager) asyncQueueWriter(interval time.Duration) {
  817. ticker := time.NewTicker(interval)
  818. for range ticker.C {
  819. if err := a.writeUserStatsQueue(); err != nil {
  820. log.Tag(tag).Err(err).Warn("Writing user stats queue failed")
  821. }
  822. if err := a.writeTokenUpdateQueue(); err != nil {
  823. log.Tag(tag).Err(err).Warn("Writing token update queue failed")
  824. }
  825. }
  826. }
  827. func (a *Manager) writeUserStatsQueue() error {
  828. a.mu.Lock()
  829. if len(a.statsQueue) == 0 {
  830. a.mu.Unlock()
  831. log.Tag(tag).Trace("No user stats updates to commit")
  832. return nil
  833. }
  834. statsQueue := a.statsQueue
  835. a.statsQueue = make(map[string]*Stats)
  836. a.mu.Unlock()
  837. tx, err := a.db.Begin()
  838. if err != nil {
  839. return err
  840. }
  841. defer tx.Rollback()
  842. log.Tag(tag).Debug("Writing user stats queue for %d user(s)", len(statsQueue))
  843. for userID, update := range statsQueue {
  844. log.
  845. Tag(tag).
  846. Fields(log.Context{
  847. "user_id": userID,
  848. "messages_count": update.Messages,
  849. "emails_count": update.Emails,
  850. "calls_count": update.Calls,
  851. }).
  852. Trace("Updating stats for user %s", userID)
  853. if _, err := tx.Exec(updateUserStatsQuery, update.Messages, update.Emails, update.Calls, userID); err != nil {
  854. return err
  855. }
  856. }
  857. return tx.Commit()
  858. }
  859. func (a *Manager) writeTokenUpdateQueue() error {
  860. a.mu.Lock()
  861. if len(a.tokenQueue) == 0 {
  862. a.mu.Unlock()
  863. log.Tag(tag).Trace("No token updates to commit")
  864. return nil
  865. }
  866. tokenQueue := a.tokenQueue
  867. a.tokenQueue = make(map[string]*TokenUpdate)
  868. a.mu.Unlock()
  869. tx, err := a.db.Begin()
  870. if err != nil {
  871. return err
  872. }
  873. defer tx.Rollback()
  874. log.Tag(tag).Debug("Writing token update queue for %d token(s)", len(tokenQueue))
  875. for tokenID, update := range tokenQueue {
  876. log.Tag(tag).Trace("Updating token %s with last access time %v", tokenID, update.LastAccess.Unix())
  877. if _, err := tx.Exec(updateTokenLastAccessQuery, update.LastAccess.Unix(), update.LastOrigin.String(), tokenID); err != nil {
  878. return err
  879. }
  880. }
  881. return tx.Commit()
  882. }
  883. // Authorize returns nil if the given user has access to the given topic using the desired
  884. // permission. The user param may be nil to signal an anonymous user.
  885. func (a *Manager) Authorize(user *User, topic string, perm Permission) error {
  886. if user != nil && user.Role == RoleAdmin {
  887. return nil // Admin can do everything
  888. }
  889. username := Everyone
  890. if user != nil {
  891. username = user.Name
  892. }
  893. // Select the read/write permissions for this user/topic combo.
  894. // - The query may return two rows (one for everyone, and one for the user), but prioritizes the user.
  895. // - Furthermore, the query prioritizes more specific permissions (longer!) over more generic ones, e.g. "test*" > "*"
  896. // - It also prioritizes write permissions over read permissions
  897. rows, err := a.db.Query(selectTopicPermsQuery, Everyone, username, topic)
  898. if err != nil {
  899. return err
  900. }
  901. defer rows.Close()
  902. if !rows.Next() {
  903. return a.resolvePerms(a.config.DefaultAccess, perm)
  904. }
  905. var read, write bool
  906. if err := rows.Scan(&read, &write); err != nil {
  907. return err
  908. } else if err := rows.Err(); err != nil {
  909. return err
  910. }
  911. return a.resolvePerms(NewPermission(read, write), perm)
  912. }
  913. func (a *Manager) resolvePerms(base, perm Permission) error {
  914. if perm == PermissionRead && base.IsRead() {
  915. return nil
  916. } else if perm == PermissionWrite && base.IsWrite() {
  917. return nil
  918. }
  919. return ErrUnauthorized
  920. }
  921. // AddUser adds a user with the given username, password and role
  922. func (a *Manager) AddUser(username, password string, role Role, hashed bool) error {
  923. return execTx(a.db, func(tx *sql.Tx) error {
  924. return a.addUserTx(tx, username, password, role, hashed, false)
  925. })
  926. }
  927. // AddUser adds a user with the given username, password and role
  928. func (a *Manager) addUserTx(tx *sql.Tx, username, password string, role Role, hashed, provisioned bool) error {
  929. if !AllowedUsername(username) || !AllowedRole(role) {
  930. return ErrInvalidArgument
  931. }
  932. var hash string
  933. var err error = nil
  934. if hashed {
  935. hash = password
  936. if err := AllowedPasswordHash(hash); err != nil {
  937. return err
  938. }
  939. } else {
  940. hash, err = a.HashPassword(password)
  941. if err != nil {
  942. return err
  943. }
  944. }
  945. userID := util.RandomStringPrefix(userIDPrefix, userIDLength)
  946. syncTopic, now := util.RandomStringPrefix(syncTopicPrefix, syncTopicLength), time.Now().Unix()
  947. if _, err = tx.Exec(insertUserQuery, userID, username, hash, role, syncTopic, provisioned, now); err != nil {
  948. if errors.Is(err, sqlite3.ErrConstraintUnique) {
  949. return ErrUserExists
  950. }
  951. return err
  952. }
  953. return nil
  954. }
  955. // RemoveUser deletes the user with the given username. The function returns nil on success, even
  956. // if the user did not exist in the first place.
  957. func (a *Manager) RemoveUser(username string) error {
  958. return execTx(a.db, func(tx *sql.Tx) error {
  959. return a.removeUserTx(tx, username)
  960. })
  961. }
  962. func (a *Manager) removeUserTx(tx *sql.Tx, username string) error {
  963. if !AllowedUsername(username) {
  964. return ErrInvalidArgument
  965. }
  966. // Rows in user_access, user_token, etc. are deleted via foreign keys
  967. if _, err := tx.Exec(deleteUserQuery, username); err != nil {
  968. return err
  969. }
  970. return nil
  971. }
  972. // MarkUserRemoved sets the deleted flag on the user, and deletes all access tokens. This prevents
  973. // successful auth via Authenticate. A background process will delete the user at a later date.
  974. func (a *Manager) MarkUserRemoved(user *User) error {
  975. if !AllowedUsername(user.Name) {
  976. return ErrInvalidArgument
  977. }
  978. tx, err := a.db.Begin()
  979. if err != nil {
  980. return err
  981. }
  982. defer tx.Rollback()
  983. if _, err := tx.Exec(deleteUserAccessQuery, user.Name, user.Name); err != nil {
  984. return err
  985. }
  986. if _, err := tx.Exec(deleteAllTokenQuery, user.ID); err != nil {
  987. return err
  988. }
  989. if _, err := tx.Exec(updateUserDeletedQuery, time.Now().Add(userHardDeleteAfterDuration).Unix(), user.ID); err != nil {
  990. return err
  991. }
  992. return tx.Commit()
  993. }
  994. // Users returns a list of users. It always also returns the Everyone user ("*").
  995. func (a *Manager) Users() ([]*User, error) {
  996. rows, err := a.db.Query(selectUsernamesQuery)
  997. if err != nil {
  998. return nil, err
  999. }
  1000. defer rows.Close()
  1001. usernames := make([]string, 0)
  1002. for rows.Next() {
  1003. var username string
  1004. if err := rows.Scan(&username); err != nil {
  1005. return nil, err
  1006. } else if err := rows.Err(); err != nil {
  1007. return nil, err
  1008. }
  1009. usernames = append(usernames, username)
  1010. }
  1011. rows.Close()
  1012. users := make([]*User, 0)
  1013. for _, username := range usernames {
  1014. user, err := a.User(username)
  1015. if err != nil {
  1016. return nil, err
  1017. }
  1018. users = append(users, user)
  1019. }
  1020. return users, nil
  1021. }
  1022. // UsersCount returns the number of users in the databsae
  1023. func (a *Manager) UsersCount() (int64, error) {
  1024. rows, err := a.db.Query(selectUserCountQuery)
  1025. if err != nil {
  1026. return 0, err
  1027. }
  1028. defer rows.Close()
  1029. if !rows.Next() {
  1030. return 0, errNoRows
  1031. }
  1032. var count int64
  1033. if err := rows.Scan(&count); err != nil {
  1034. return 0, err
  1035. }
  1036. return count, nil
  1037. }
  1038. // User returns the user with the given username if it exists, or ErrUserNotFound otherwise.
  1039. // You may also pass Everyone to retrieve the anonymous user and its Grant list.
  1040. func (a *Manager) User(username string) (*User, error) {
  1041. rows, err := a.db.Query(selectUserByNameQuery, username)
  1042. if err != nil {
  1043. return nil, err
  1044. }
  1045. return a.readUser(rows)
  1046. }
  1047. // UserByID returns the user with the given ID if it exists, or ErrUserNotFound otherwise
  1048. func (a *Manager) UserByID(id string) (*User, error) {
  1049. rows, err := a.db.Query(selectUserByIDQuery, id)
  1050. if err != nil {
  1051. return nil, err
  1052. }
  1053. return a.readUser(rows)
  1054. }
  1055. // UserByStripeCustomer returns the user with the given Stripe customer ID if it exists, or ErrUserNotFound otherwise.
  1056. func (a *Manager) UserByStripeCustomer(stripeCustomerID string) (*User, error) {
  1057. rows, err := a.db.Query(selectUserByStripeCustomerIDQuery, stripeCustomerID)
  1058. if err != nil {
  1059. return nil, err
  1060. }
  1061. return a.readUser(rows)
  1062. }
  1063. func (a *Manager) userByToken(token string) (*User, error) {
  1064. rows, err := a.db.Query(selectUserByTokenQuery, token, time.Now().Unix())
  1065. if err != nil {
  1066. return nil, err
  1067. }
  1068. return a.readUser(rows)
  1069. }
  1070. func (a *Manager) readUser(rows *sql.Rows) (*User, error) {
  1071. defer rows.Close()
  1072. var id, username, hash, role, prefs, syncTopic string
  1073. var provisioned bool
  1074. var stripeCustomerID, stripeSubscriptionID, stripeSubscriptionStatus, stripeSubscriptionInterval, stripeMonthlyPriceID, stripeYearlyPriceID, tierID, tierCode, tierName sql.NullString
  1075. var messages, emails, calls int64
  1076. var messagesLimit, messagesExpiryDuration, emailsLimit, callsLimit, reservationsLimit, attachmentFileSizeLimit, attachmentTotalSizeLimit, attachmentExpiryDuration, attachmentBandwidthLimit, stripeSubscriptionPaidUntil, stripeSubscriptionCancelAt, deleted sql.NullInt64
  1077. if !rows.Next() {
  1078. return nil, ErrUserNotFound
  1079. }
  1080. if err := rows.Scan(&id, &username, &hash, &role, &prefs, &syncTopic, &provisioned, &messages, &emails, &calls, &stripeCustomerID, &stripeSubscriptionID, &stripeSubscriptionStatus, &stripeSubscriptionInterval, &stripeSubscriptionPaidUntil, &stripeSubscriptionCancelAt, &deleted, &tierID, &tierCode, &tierName, &messagesLimit, &messagesExpiryDuration, &emailsLimit, &callsLimit, &reservationsLimit, &attachmentFileSizeLimit, &attachmentTotalSizeLimit, &attachmentExpiryDuration, &attachmentBandwidthLimit, &stripeMonthlyPriceID, &stripeYearlyPriceID); err != nil {
  1081. return nil, err
  1082. } else if err := rows.Err(); err != nil {
  1083. return nil, err
  1084. }
  1085. user := &User{
  1086. ID: id,
  1087. Name: username,
  1088. Hash: hash,
  1089. Role: Role(role),
  1090. Prefs: &Prefs{},
  1091. SyncTopic: syncTopic,
  1092. Provisioned: provisioned,
  1093. Stats: &Stats{
  1094. Messages: messages,
  1095. Emails: emails,
  1096. Calls: calls,
  1097. },
  1098. Billing: &Billing{
  1099. StripeCustomerID: stripeCustomerID.String, // May be empty
  1100. StripeSubscriptionID: stripeSubscriptionID.String, // May be empty
  1101. StripeSubscriptionStatus: stripe.SubscriptionStatus(stripeSubscriptionStatus.String), // May be empty
  1102. StripeSubscriptionInterval: stripe.PriceRecurringInterval(stripeSubscriptionInterval.String), // May be empty
  1103. StripeSubscriptionPaidUntil: time.Unix(stripeSubscriptionPaidUntil.Int64, 0), // May be zero
  1104. StripeSubscriptionCancelAt: time.Unix(stripeSubscriptionCancelAt.Int64, 0), // May be zero
  1105. },
  1106. Deleted: deleted.Valid,
  1107. }
  1108. if err := json.Unmarshal([]byte(prefs), user.Prefs); err != nil {
  1109. return nil, err
  1110. }
  1111. if tierCode.Valid {
  1112. // See readTier() when this is changed!
  1113. user.Tier = &Tier{
  1114. ID: tierID.String,
  1115. Code: tierCode.String,
  1116. Name: tierName.String,
  1117. MessageLimit: messagesLimit.Int64,
  1118. MessageExpiryDuration: time.Duration(messagesExpiryDuration.Int64) * time.Second,
  1119. EmailLimit: emailsLimit.Int64,
  1120. CallLimit: callsLimit.Int64,
  1121. ReservationLimit: reservationsLimit.Int64,
  1122. AttachmentFileSizeLimit: attachmentFileSizeLimit.Int64,
  1123. AttachmentTotalSizeLimit: attachmentTotalSizeLimit.Int64,
  1124. AttachmentExpiryDuration: time.Duration(attachmentExpiryDuration.Int64) * time.Second,
  1125. AttachmentBandwidthLimit: attachmentBandwidthLimit.Int64,
  1126. StripeMonthlyPriceID: stripeMonthlyPriceID.String, // May be empty
  1127. StripeYearlyPriceID: stripeYearlyPriceID.String, // May be empty
  1128. }
  1129. }
  1130. return user, nil
  1131. }
  1132. // AllGrants returns all user-specific access control entries, mapped to their respective user IDs
  1133. func (a *Manager) AllGrants() (map[string][]Grant, error) {
  1134. rows, err := a.db.Query(selectUserAllAccessQuery)
  1135. if err != nil {
  1136. return nil, err
  1137. }
  1138. defer rows.Close()
  1139. grants := make(map[string][]Grant, 0)
  1140. for rows.Next() {
  1141. var userID, topic string
  1142. var read, write, provisioned bool
  1143. if err := rows.Scan(&userID, &topic, &read, &write, &provisioned); err != nil {
  1144. return nil, err
  1145. } else if err := rows.Err(); err != nil {
  1146. return nil, err
  1147. }
  1148. if _, ok := grants[userID]; !ok {
  1149. grants[userID] = make([]Grant, 0)
  1150. }
  1151. grants[userID] = append(grants[userID], Grant{
  1152. TopicPattern: fromSQLWildcard(topic),
  1153. Permission: NewPermission(read, write),
  1154. Provisioned: provisioned,
  1155. })
  1156. }
  1157. return grants, nil
  1158. }
  1159. // Grants returns all user-specific access control entries
  1160. func (a *Manager) Grants(username string) ([]Grant, error) {
  1161. rows, err := a.db.Query(selectUserAccessQuery, username)
  1162. if err != nil {
  1163. return nil, err
  1164. }
  1165. defer rows.Close()
  1166. grants := make([]Grant, 0)
  1167. for rows.Next() {
  1168. var topic string
  1169. var read, write, provisioned bool
  1170. if err := rows.Scan(&topic, &read, &write, &provisioned); err != nil {
  1171. return nil, err
  1172. } else if err := rows.Err(); err != nil {
  1173. return nil, err
  1174. }
  1175. grants = append(grants, Grant{
  1176. TopicPattern: fromSQLWildcard(topic),
  1177. Permission: NewPermission(read, write),
  1178. Provisioned: provisioned,
  1179. })
  1180. }
  1181. return grants, nil
  1182. }
  1183. // Reservations returns all user-owned topics, and the associated everyone-access
  1184. func (a *Manager) Reservations(username string) ([]Reservation, error) {
  1185. rows, err := a.db.Query(selectUserReservationsQuery, Everyone, username)
  1186. if err != nil {
  1187. return nil, err
  1188. }
  1189. defer rows.Close()
  1190. reservations := make([]Reservation, 0)
  1191. for rows.Next() {
  1192. var topic string
  1193. var ownerRead, ownerWrite bool
  1194. var everyoneRead, everyoneWrite sql.NullBool
  1195. if err := rows.Scan(&topic, &ownerRead, &ownerWrite, &everyoneRead, &everyoneWrite); err != nil {
  1196. return nil, err
  1197. } else if err := rows.Err(); err != nil {
  1198. return nil, err
  1199. }
  1200. reservations = append(reservations, Reservation{
  1201. Topic: unescapeUnderscore(topic),
  1202. Owner: NewPermission(ownerRead, ownerWrite),
  1203. Everyone: NewPermission(everyoneRead.Bool, everyoneWrite.Bool), // false if null
  1204. })
  1205. }
  1206. return reservations, nil
  1207. }
  1208. // HasReservation returns true if the given topic access is owned by the user
  1209. func (a *Manager) HasReservation(username, topic string) (bool, error) {
  1210. rows, err := a.db.Query(selectUserHasReservationQuery, username, escapeUnderscore(topic))
  1211. if err != nil {
  1212. return false, err
  1213. }
  1214. defer rows.Close()
  1215. if !rows.Next() {
  1216. return false, errNoRows
  1217. }
  1218. var count int64
  1219. if err := rows.Scan(&count); err != nil {
  1220. return false, err
  1221. }
  1222. return count > 0, nil
  1223. }
  1224. // ReservationsCount returns the number of reservations owned by this user
  1225. func (a *Manager) ReservationsCount(username string) (int64, error) {
  1226. rows, err := a.db.Query(selectUserReservationsCountQuery, username)
  1227. if err != nil {
  1228. return 0, err
  1229. }
  1230. defer rows.Close()
  1231. if !rows.Next() {
  1232. return 0, errNoRows
  1233. }
  1234. var count int64
  1235. if err := rows.Scan(&count); err != nil {
  1236. return 0, err
  1237. }
  1238. return count, nil
  1239. }
  1240. // ReservationOwner returns user ID of the user that owns this topic, or an
  1241. // empty string if it's not owned by anyone
  1242. func (a *Manager) ReservationOwner(topic string) (string, error) {
  1243. rows, err := a.db.Query(selectUserReservationsOwnerQuery, escapeUnderscore(topic))
  1244. if err != nil {
  1245. return "", err
  1246. }
  1247. defer rows.Close()
  1248. if !rows.Next() {
  1249. return "", nil
  1250. }
  1251. var ownerUserID string
  1252. if err := rows.Scan(&ownerUserID); err != nil {
  1253. return "", err
  1254. }
  1255. return ownerUserID, nil
  1256. }
  1257. // ChangePassword changes a user's password
  1258. func (a *Manager) ChangePassword(username, password string, hashed bool) error {
  1259. return execTx(a.db, func(tx *sql.Tx) error {
  1260. return a.changePasswordTx(tx, username, password, hashed)
  1261. })
  1262. }
  1263. func (a *Manager) changePasswordTx(tx *sql.Tx, username, password string, hashed bool) error {
  1264. var hash string
  1265. var err error
  1266. if hashed {
  1267. hash = password
  1268. if err := AllowedPasswordHash(hash); err != nil {
  1269. return err
  1270. }
  1271. } else {
  1272. hash, err = a.HashPassword(password)
  1273. if err != nil {
  1274. return err
  1275. }
  1276. }
  1277. if _, err := tx.Exec(updateUserPassQuery, hash, username); err != nil {
  1278. return err
  1279. }
  1280. return nil
  1281. }
  1282. // ChangeRole changes a user's role. When a role is changed from RoleUser to RoleAdmin,
  1283. // all existing access control entries (Grant) are removed, since they are no longer needed.
  1284. func (a *Manager) ChangeRole(username string, role Role) error {
  1285. return execTx(a.db, func(tx *sql.Tx) error {
  1286. return a.changeRoleTx(tx, username, role)
  1287. })
  1288. }
  1289. func (a *Manager) changeRoleTx(tx *sql.Tx, username string, role Role) error {
  1290. if !AllowedUsername(username) || !AllowedRole(role) {
  1291. return ErrInvalidArgument
  1292. }
  1293. if _, err := tx.Exec(updateUserRoleQuery, string(role), username); err != nil {
  1294. return err
  1295. }
  1296. if role == RoleAdmin {
  1297. if _, err := tx.Exec(deleteUserAccessQuery, username, username); err != nil {
  1298. return err
  1299. }
  1300. }
  1301. return nil
  1302. }
  1303. // ChangeProvisioned changes the provisioned status of a user. This is used to mark users as
  1304. // provisioned. A provisioned user is a user defined in the config file.
  1305. func (a *Manager) ChangeProvisioned(username string, provisioned bool) error {
  1306. return execTx(a.db, func(tx *sql.Tx) error {
  1307. return a.changeProvisionedTx(tx, username, provisioned)
  1308. })
  1309. }
  1310. func (a *Manager) changeProvisionedTx(tx *sql.Tx, username string, provisioned bool) error {
  1311. if _, err := tx.Exec(updateUserProvisionedQuery, provisioned, username); err != nil {
  1312. return err
  1313. }
  1314. return nil
  1315. }
  1316. // ChangeTier changes a user's tier using the tier code. This function does not delete reservations, messages,
  1317. // or attachments, even if the new tier has lower limits in this regard. That has to be done elsewhere.
  1318. func (a *Manager) ChangeTier(username, tier string) error {
  1319. if !AllowedUsername(username) {
  1320. return ErrInvalidArgument
  1321. }
  1322. t, err := a.Tier(tier)
  1323. if err != nil {
  1324. return err
  1325. } else if err := a.checkReservationsLimit(username, t.ReservationLimit); err != nil {
  1326. return err
  1327. }
  1328. if _, err := a.db.Exec(updateUserTierQuery, tier, username); err != nil {
  1329. return err
  1330. }
  1331. return nil
  1332. }
  1333. // ResetTier removes the tier from the given user
  1334. func (a *Manager) ResetTier(username string) error {
  1335. if !AllowedUsername(username) && username != Everyone && username != "" {
  1336. return ErrInvalidArgument
  1337. } else if err := a.checkReservationsLimit(username, 0); err != nil {
  1338. return err
  1339. }
  1340. _, err := a.db.Exec(deleteUserTierQuery, username)
  1341. return err
  1342. }
  1343. func (a *Manager) checkReservationsLimit(username string, reservationsLimit int64) error {
  1344. u, err := a.User(username)
  1345. if err != nil {
  1346. return err
  1347. }
  1348. if u.Tier != nil && reservationsLimit < u.Tier.ReservationLimit {
  1349. reservations, err := a.Reservations(username)
  1350. if err != nil {
  1351. return err
  1352. } else if int64(len(reservations)) > reservationsLimit {
  1353. return ErrTooManyReservations
  1354. }
  1355. }
  1356. return nil
  1357. }
  1358. // AllowReservation tests if a user may create an access control entry for the given topic.
  1359. // If there are any ACL entries that are not owned by the user, an error is returned.
  1360. func (a *Manager) AllowReservation(username string, topic string) error {
  1361. if (!AllowedUsername(username) && username != Everyone) || !AllowedTopic(topic) {
  1362. return ErrInvalidArgument
  1363. }
  1364. rows, err := a.db.Query(selectOtherAccessCountQuery, escapeUnderscore(topic), escapeUnderscore(topic), username)
  1365. if err != nil {
  1366. return err
  1367. }
  1368. defer rows.Close()
  1369. if !rows.Next() {
  1370. return errNoRows
  1371. }
  1372. var otherCount int
  1373. if err := rows.Scan(&otherCount); err != nil {
  1374. return err
  1375. }
  1376. if otherCount > 0 {
  1377. return errTopicOwnedByOthers
  1378. }
  1379. return nil
  1380. }
  1381. // AllowAccess adds or updates an entry in th access control list for a specific user. It controls
  1382. // read/write access to a topic. The parameter topicPattern may include wildcards (*). The ACL entry
  1383. // owner may either be a user (username), or the system (empty).
  1384. func (a *Manager) AllowAccess(username string, topicPattern string, permission Permission) error {
  1385. return execTx(a.db, func(tx *sql.Tx) error {
  1386. return a.allowAccessTx(tx, username, topicPattern, permission, false)
  1387. })
  1388. }
  1389. func (a *Manager) allowAccessTx(tx *sql.Tx, username string, topicPattern string, permission Permission, provisioned bool) error {
  1390. if !AllowedUsername(username) && username != Everyone {
  1391. return ErrInvalidArgument
  1392. } else if !AllowedTopicPattern(topicPattern) {
  1393. return ErrInvalidArgument
  1394. }
  1395. owner := ""
  1396. if _, err := tx.Exec(upsertUserAccessQuery, username, toSQLWildcard(topicPattern), permission.IsRead(), permission.IsWrite(), owner, owner, provisioned); err != nil {
  1397. return err
  1398. }
  1399. return nil
  1400. }
  1401. // ResetAccess removes an access control list entry for a specific username/topic, or (if topic is
  1402. // empty) for an entire user. The parameter topicPattern may include wildcards (*).
  1403. func (a *Manager) ResetAccess(username string, topicPattern string) error {
  1404. if !AllowedUsername(username) && username != Everyone && username != "" {
  1405. return ErrInvalidArgument
  1406. } else if !AllowedTopicPattern(topicPattern) && topicPattern != "" {
  1407. return ErrInvalidArgument
  1408. }
  1409. if username == "" && topicPattern == "" {
  1410. _, err := a.db.Exec(deleteAllAccessQuery, username)
  1411. return err
  1412. } else if topicPattern == "" {
  1413. _, err := a.db.Exec(deleteUserAccessQuery, username, username)
  1414. return err
  1415. }
  1416. _, err := a.db.Exec(deleteTopicAccessQuery, username, username, toSQLWildcard(topicPattern))
  1417. return err
  1418. }
  1419. // AddReservation creates two access control entries for the given topic: one with full read/write access for the
  1420. // given user, and one for Everyone with the permission passed as everyone. The user also owns the entries, and
  1421. // can modify or delete them.
  1422. func (a *Manager) AddReservation(username string, topic string, everyone Permission) error {
  1423. if !AllowedUsername(username) || username == Everyone || !AllowedTopic(topic) {
  1424. return ErrInvalidArgument
  1425. }
  1426. tx, err := a.db.Begin()
  1427. if err != nil {
  1428. return err
  1429. }
  1430. defer tx.Rollback()
  1431. if _, err := tx.Exec(upsertUserAccessQuery, username, escapeUnderscore(topic), true, true, username, username, false); err != nil {
  1432. return err
  1433. }
  1434. if _, err := tx.Exec(upsertUserAccessQuery, Everyone, escapeUnderscore(topic), everyone.IsRead(), everyone.IsWrite(), username, username, false); err != nil {
  1435. return err
  1436. }
  1437. return tx.Commit()
  1438. }
  1439. // RemoveReservations deletes the access control entries associated with the given username/topic, as
  1440. // well as all entries with Everyone/topic. This is the counterpart for AddReservation.
  1441. func (a *Manager) RemoveReservations(username string, topics ...string) error {
  1442. if !AllowedUsername(username) || username == Everyone || len(topics) == 0 {
  1443. return ErrInvalidArgument
  1444. }
  1445. for _, topic := range topics {
  1446. if !AllowedTopic(topic) {
  1447. return ErrInvalidArgument
  1448. }
  1449. }
  1450. tx, err := a.db.Begin()
  1451. if err != nil {
  1452. return err
  1453. }
  1454. defer tx.Rollback()
  1455. for _, topic := range topics {
  1456. if _, err := tx.Exec(deleteTopicAccessQuery, username, username, escapeUnderscore(topic)); err != nil {
  1457. return err
  1458. }
  1459. if _, err := tx.Exec(deleteTopicAccessQuery, Everyone, Everyone, escapeUnderscore(topic)); err != nil {
  1460. return err
  1461. }
  1462. }
  1463. return tx.Commit()
  1464. }
  1465. // DefaultAccess returns the default read/write access if no access control entry matches
  1466. func (a *Manager) DefaultAccess() Permission {
  1467. return a.config.DefaultAccess
  1468. }
  1469. // AddTier creates a new tier in the database
  1470. func (a *Manager) AddTier(tier *Tier) error {
  1471. if tier.ID == "" {
  1472. tier.ID = util.RandomStringPrefix(tierIDPrefix, tierIDLength)
  1473. }
  1474. if _, err := a.db.Exec(insertTierQuery, tier.ID, tier.Code, tier.Name, tier.MessageLimit, int64(tier.MessageExpiryDuration.Seconds()), tier.EmailLimit, tier.CallLimit, tier.ReservationLimit, tier.AttachmentFileSizeLimit, tier.AttachmentTotalSizeLimit, int64(tier.AttachmentExpiryDuration.Seconds()), tier.AttachmentBandwidthLimit, nullString(tier.StripeMonthlyPriceID), nullString(tier.StripeYearlyPriceID)); err != nil {
  1475. return err
  1476. }
  1477. return nil
  1478. }
  1479. // UpdateTier updates a tier's properties in the database
  1480. func (a *Manager) UpdateTier(tier *Tier) error {
  1481. if _, err := a.db.Exec(updateTierQuery, tier.Name, tier.MessageLimit, int64(tier.MessageExpiryDuration.Seconds()), tier.EmailLimit, tier.CallLimit, tier.ReservationLimit, tier.AttachmentFileSizeLimit, tier.AttachmentTotalSizeLimit, int64(tier.AttachmentExpiryDuration.Seconds()), tier.AttachmentBandwidthLimit, nullString(tier.StripeMonthlyPriceID), nullString(tier.StripeYearlyPriceID), tier.Code); err != nil {
  1482. return err
  1483. }
  1484. return nil
  1485. }
  1486. // RemoveTier deletes the tier with the given code
  1487. func (a *Manager) RemoveTier(code string) error {
  1488. if !AllowedTier(code) {
  1489. return ErrInvalidArgument
  1490. }
  1491. // This fails if any user has this tier
  1492. if _, err := a.db.Exec(deleteTierQuery, code); err != nil {
  1493. return err
  1494. }
  1495. return nil
  1496. }
  1497. // ChangeBilling updates a user's billing fields, namely the Stripe customer ID, and subscription information
  1498. func (a *Manager) ChangeBilling(username string, billing *Billing) error {
  1499. if _, err := a.db.Exec(updateBillingQuery, nullString(billing.StripeCustomerID), nullString(billing.StripeSubscriptionID), nullString(string(billing.StripeSubscriptionStatus)), nullString(string(billing.StripeSubscriptionInterval)), nullInt64(billing.StripeSubscriptionPaidUntil.Unix()), nullInt64(billing.StripeSubscriptionCancelAt.Unix()), username); err != nil {
  1500. return err
  1501. }
  1502. return nil
  1503. }
  1504. // Tiers returns a list of all Tier structs
  1505. func (a *Manager) Tiers() ([]*Tier, error) {
  1506. rows, err := a.db.Query(selectTiersQuery)
  1507. if err != nil {
  1508. return nil, err
  1509. }
  1510. defer rows.Close()
  1511. tiers := make([]*Tier, 0)
  1512. for {
  1513. tier, err := a.readTier(rows)
  1514. if err == ErrTierNotFound {
  1515. break
  1516. } else if err != nil {
  1517. return nil, err
  1518. }
  1519. tiers = append(tiers, tier)
  1520. }
  1521. return tiers, nil
  1522. }
  1523. // Tier returns a Tier based on the code, or ErrTierNotFound if it does not exist
  1524. func (a *Manager) Tier(code string) (*Tier, error) {
  1525. rows, err := a.db.Query(selectTierByCodeQuery, code)
  1526. if err != nil {
  1527. return nil, err
  1528. }
  1529. defer rows.Close()
  1530. return a.readTier(rows)
  1531. }
  1532. // TierByStripePrice returns a Tier based on the Stripe price ID, or ErrTierNotFound if it does not exist
  1533. func (a *Manager) TierByStripePrice(priceID string) (*Tier, error) {
  1534. rows, err := a.db.Query(selectTierByPriceIDQuery, priceID, priceID)
  1535. if err != nil {
  1536. return nil, err
  1537. }
  1538. defer rows.Close()
  1539. return a.readTier(rows)
  1540. }
  1541. func (a *Manager) readTier(rows *sql.Rows) (*Tier, error) {
  1542. var id, code, name string
  1543. var stripeMonthlyPriceID, stripeYearlyPriceID sql.NullString
  1544. var messagesLimit, messagesExpiryDuration, emailsLimit, callsLimit, reservationsLimit, attachmentFileSizeLimit, attachmentTotalSizeLimit, attachmentExpiryDuration, attachmentBandwidthLimit sql.NullInt64
  1545. if !rows.Next() {
  1546. return nil, ErrTierNotFound
  1547. }
  1548. if err := rows.Scan(&id, &code, &name, &messagesLimit, &messagesExpiryDuration, &emailsLimit, &callsLimit, &reservationsLimit, &attachmentFileSizeLimit, &attachmentTotalSizeLimit, &attachmentExpiryDuration, &attachmentBandwidthLimit, &stripeMonthlyPriceID, &stripeYearlyPriceID); err != nil {
  1549. return nil, err
  1550. } else if err := rows.Err(); err != nil {
  1551. return nil, err
  1552. }
  1553. // When changed, note readUser() as well
  1554. return &Tier{
  1555. ID: id,
  1556. Code: code,
  1557. Name: name,
  1558. MessageLimit: messagesLimit.Int64,
  1559. MessageExpiryDuration: time.Duration(messagesExpiryDuration.Int64) * time.Second,
  1560. EmailLimit: emailsLimit.Int64,
  1561. CallLimit: callsLimit.Int64,
  1562. ReservationLimit: reservationsLimit.Int64,
  1563. AttachmentFileSizeLimit: attachmentFileSizeLimit.Int64,
  1564. AttachmentTotalSizeLimit: attachmentTotalSizeLimit.Int64,
  1565. AttachmentExpiryDuration: time.Duration(attachmentExpiryDuration.Int64) * time.Second,
  1566. AttachmentBandwidthLimit: attachmentBandwidthLimit.Int64,
  1567. StripeMonthlyPriceID: stripeMonthlyPriceID.String, // May be empty
  1568. StripeYearlyPriceID: stripeYearlyPriceID.String, // May be empty
  1569. }, nil
  1570. }
  1571. // HashPassword hashes the given password using bcrypt with the configured cost
  1572. func (a *Manager) HashPassword(password string) (string, error) {
  1573. hash, err := bcrypt.GenerateFromPassword([]byte(password), a.config.BcryptCost)
  1574. if err != nil {
  1575. return "", err
  1576. }
  1577. return string(hash), nil
  1578. }
  1579. // Close closes the underlying database
  1580. func (a *Manager) Close() error {
  1581. return a.db.Close()
  1582. }
  1583. func (a *Manager) maybeProvisionUsersAndAccess() error {
  1584. if !a.config.ProvisionEnabled {
  1585. return nil
  1586. }
  1587. users, err := a.Users()
  1588. if err != nil {
  1589. return err
  1590. }
  1591. provisionUsernames := util.Map(a.config.Users, func(u *User) string {
  1592. return u.Name
  1593. })
  1594. return execTx(a.db, func(tx *sql.Tx) error {
  1595. // Remove users that are provisioned, but not in the config anymore
  1596. for _, user := range users {
  1597. if user.Name == Everyone {
  1598. continue
  1599. } else if user.Provisioned && !util.Contains(provisionUsernames, user.Name) {
  1600. if err := a.removeUserTx(tx, user.Name); err != nil {
  1601. return fmt.Errorf("failed to remove provisioned user %s: %v", user.Name, err)
  1602. }
  1603. }
  1604. }
  1605. // Add or update provisioned users
  1606. for _, user := range a.config.Users {
  1607. if user.Name == Everyone {
  1608. continue
  1609. }
  1610. existingUser, exists := util.Find(users, func(u *User) bool {
  1611. return u.Name == user.Name
  1612. })
  1613. if !exists {
  1614. if err := a.addUserTx(tx, user.Name, user.Hash, user.Role, true, true); err != nil && !errors.Is(err, ErrUserExists) {
  1615. return fmt.Errorf("failed to add provisioned user %s: %v", user.Name, err)
  1616. }
  1617. } else {
  1618. if !existingUser.Provisioned {
  1619. if err := a.changeProvisionedTx(tx, user.Name, true); err != nil {
  1620. return fmt.Errorf("failed to change provisioned status for user %s: %v", user.Name, err)
  1621. }
  1622. }
  1623. if existingUser.Hash != user.Hash {
  1624. if err := a.changePasswordTx(tx, user.Name, user.Hash, true); err != nil {
  1625. return fmt.Errorf("failed to change password for provisioned user %s: %v", user.Name, err)
  1626. }
  1627. }
  1628. if existingUser.Role != user.Role {
  1629. if err := a.changeRoleTx(tx, user.Name, user.Role); err != nil {
  1630. return fmt.Errorf("failed to change role for provisioned user %s: %v", user.Name, err)
  1631. }
  1632. }
  1633. }
  1634. }
  1635. // Remove and (re-)add provisioned grants
  1636. if _, err := tx.Exec(deleteUserAccessProvisionedQuery); err != nil {
  1637. return err
  1638. }
  1639. for username, grants := range a.config.Access {
  1640. for _, grant := range grants {
  1641. if err := a.allowAccessTx(tx, username, grant.TopicPattern, grant.Permission, true); err != nil {
  1642. return err
  1643. }
  1644. }
  1645. }
  1646. return nil
  1647. })
  1648. }
  1649. // toSQLWildcard converts a wildcard string to a SQL wildcard string. It only allows '*' as wildcards,
  1650. // and escapes '_', assuming '\' as escape character.
  1651. func toSQLWildcard(s string) string {
  1652. return escapeUnderscore(strings.ReplaceAll(s, "*", "%"))
  1653. }
  1654. // fromSQLWildcard converts a SQL wildcard string to a wildcard string. It converts '%' to '*',
  1655. // and removes the '\_' escape character.
  1656. func fromSQLWildcard(s string) string {
  1657. return strings.ReplaceAll(unescapeUnderscore(s), "%", "*")
  1658. }
  1659. func escapeUnderscore(s string) string {
  1660. return strings.ReplaceAll(s, "_", "\\_")
  1661. }
  1662. func unescapeUnderscore(s string) string {
  1663. return strings.ReplaceAll(s, "\\_", "_")
  1664. }
  1665. func runStartupQueries(db *sql.DB, startupQueries string) error {
  1666. if _, err := db.Exec(startupQueries); err != nil {
  1667. return err
  1668. }
  1669. if _, err := db.Exec(builtinStartupQueries); err != nil {
  1670. return err
  1671. }
  1672. return nil
  1673. }
  1674. func setupDB(db *sql.DB) error {
  1675. // If 'schemaVersion' table does not exist, this must be a new database
  1676. rowsSV, err := db.Query(selectSchemaVersionQuery)
  1677. if err != nil {
  1678. return setupNewDB(db)
  1679. }
  1680. defer rowsSV.Close()
  1681. // If 'schemaVersion' table exists, read version and potentially upgrade
  1682. schemaVersion := 0
  1683. if !rowsSV.Next() {
  1684. return errors.New("cannot determine schema version: database file may be corrupt")
  1685. }
  1686. if err := rowsSV.Scan(&schemaVersion); err != nil {
  1687. return err
  1688. }
  1689. rowsSV.Close()
  1690. // Do migrations
  1691. if schemaVersion == currentSchemaVersion {
  1692. return nil
  1693. } else if schemaVersion > currentSchemaVersion {
  1694. return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, currentSchemaVersion)
  1695. }
  1696. for i := schemaVersion; i < currentSchemaVersion; i++ {
  1697. fn, ok := migrations[i]
  1698. if !ok {
  1699. return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1)
  1700. } else if err := fn(db); err != nil {
  1701. return err
  1702. }
  1703. }
  1704. return nil
  1705. }
  1706. func setupNewDB(db *sql.DB) error {
  1707. if _, err := db.Exec(createTablesQueries); err != nil {
  1708. return err
  1709. }
  1710. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  1711. return err
  1712. }
  1713. return nil
  1714. }
  1715. func migrateFrom1(db *sql.DB) error {
  1716. log.Tag(tag).Info("Migrating user database schema: from 1 to 2")
  1717. tx, err := db.Begin()
  1718. if err != nil {
  1719. return err
  1720. }
  1721. defer tx.Rollback()
  1722. // Rename user -> user_old, and create new tables
  1723. if _, err := tx.Exec(migrate1To2CreateTablesQueries); err != nil {
  1724. return err
  1725. }
  1726. // Insert users from user_old into new user table, with ID and sync_topic
  1727. rows, err := tx.Query(migrate1To2SelectAllOldUsernamesNoTx)
  1728. if err != nil {
  1729. return err
  1730. }
  1731. defer rows.Close()
  1732. usernames := make([]string, 0)
  1733. for rows.Next() {
  1734. var username string
  1735. if err := rows.Scan(&username); err != nil {
  1736. return err
  1737. }
  1738. usernames = append(usernames, username)
  1739. }
  1740. if err := rows.Close(); err != nil {
  1741. return err
  1742. }
  1743. for _, username := range usernames {
  1744. userID := util.RandomStringPrefix(userIDPrefix, userIDLength)
  1745. syncTopic := util.RandomStringPrefix(syncTopicPrefix, syncTopicLength)
  1746. if _, err := tx.Exec(migrate1To2InsertUserNoTx, userID, syncTopic, username); err != nil {
  1747. return err
  1748. }
  1749. }
  1750. // Migrate old "access" table to "user_access" and drop "access" and "user_old"
  1751. if _, err := tx.Exec(migrate1To2InsertFromOldTablesAndDropNoTx); err != nil {
  1752. return err
  1753. }
  1754. if _, err := tx.Exec(updateSchemaVersion, 2); err != nil {
  1755. return err
  1756. }
  1757. if err := tx.Commit(); err != nil {
  1758. return err
  1759. }
  1760. return nil
  1761. }
  1762. func migrateFrom2(db *sql.DB) error {
  1763. log.Tag(tag).Info("Migrating user database schema: from 2 to 3")
  1764. tx, err := db.Begin()
  1765. if err != nil {
  1766. return err
  1767. }
  1768. defer tx.Rollback()
  1769. if _, err := tx.Exec(migrate2To3UpdateQueries); err != nil {
  1770. return err
  1771. }
  1772. if _, err := tx.Exec(updateSchemaVersion, 3); err != nil {
  1773. return err
  1774. }
  1775. return tx.Commit()
  1776. }
  1777. func migrateFrom3(db *sql.DB) error {
  1778. log.Tag(tag).Info("Migrating user database schema: from 3 to 4")
  1779. tx, err := db.Begin()
  1780. if err != nil {
  1781. return err
  1782. }
  1783. defer tx.Rollback()
  1784. if _, err := tx.Exec(migrate3To4UpdateQueries); err != nil {
  1785. return err
  1786. }
  1787. if _, err := tx.Exec(updateSchemaVersion, 4); err != nil {
  1788. return err
  1789. }
  1790. return tx.Commit()
  1791. }
  1792. func migrateFrom4(db *sql.DB) error {
  1793. log.Tag(tag).Info("Migrating user database schema: from 4 to 5")
  1794. tx, err := db.Begin()
  1795. if err != nil {
  1796. return err
  1797. }
  1798. defer tx.Rollback()
  1799. if _, err := tx.Exec(migrate4To5UpdateQueries); err != nil {
  1800. return err
  1801. }
  1802. if _, err := tx.Exec(updateSchemaVersion, 5); err != nil {
  1803. return err
  1804. }
  1805. return tx.Commit()
  1806. }
  1807. func migrateFrom5(db *sql.DB) error {
  1808. log.Tag(tag).Info("Migrating user database schema: from 5 to 6")
  1809. tx, err := db.Begin()
  1810. if err != nil {
  1811. return err
  1812. }
  1813. defer tx.Rollback()
  1814. if _, err := tx.Exec(migrate5To6UpdateQueries); err != nil {
  1815. return err
  1816. }
  1817. if _, err := tx.Exec(updateSchemaVersion, 6); err != nil {
  1818. return err
  1819. }
  1820. return tx.Commit()
  1821. }
  1822. func nullString(s string) sql.NullString {
  1823. if s == "" {
  1824. return sql.NullString{}
  1825. }
  1826. return sql.NullString{String: s, Valid: true}
  1827. }
  1828. func nullInt64(v int64) sql.NullInt64 {
  1829. if v == 0 {
  1830. return sql.NullInt64{}
  1831. }
  1832. return sql.NullInt64{Int64: v, Valid: true}
  1833. }
  1834. // execTx executes a function in a transaction. If the function returns an error, the transaction is rolled back.
  1835. func execTx(db *sql.DB, f func(tx *sql.Tx) error) error {
  1836. tx, err := db.Begin()
  1837. if err != nil {
  1838. return err
  1839. }
  1840. if err := f(tx); err != nil {
  1841. if e := tx.Rollback(); e != nil {
  1842. return err
  1843. }
  1844. return err
  1845. }
  1846. return tx.Commit()
  1847. }