binwiederhier 2 лет назад
Родитель
Сommit
625b13280f
10 измененных файлов с 250 добавлено и 46 удалено
  1. 0 1
      go.mod
  2. 0 31
      go.sum
  3. 28 1
      server/server.go
  4. 50 0
      server/server_access.go
  5. 100 0
      server/server_access_test.go
  6. 1 1
      server/server_account.go
  7. 9 0
      server/server_middleware.go
  8. 26 10
      server/topic.go
  9. 25 2
      server/topic_test.go
  10. 11 0
      server/types.go

+ 0 - 1
go.mod

@@ -57,7 +57,6 @@ require (
 	github.com/prometheus/client_model v0.4.0 // indirect
 	github.com/prometheus/common v0.43.0 // indirect
 	github.com/prometheus/procfs v0.9.0 // indirect
-	github.com/rogpeppe/go-internal v1.10.0 // indirect
 	github.com/russross/blackfriday/v2 v2.1.0 // indirect
 	github.com/stretchr/objx v0.5.0 // indirect
 	github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect

+ 0 - 31
go.sum

@@ -1,7 +1,5 @@
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
-cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys=
-cloud.google.com/go v0.110.0/go.mod h1:SJnCLqQ0FCFGSZMUNUf84MV3Aia54kn7pi8st7tMzaY=
 cloud.google.com/go v0.110.1 h1:oDJ19Fu9TX9Xs06iyCw4yifSqZ7JQ8BeuVHcTmWQlOA=
 cloud.google.com/go v0.110.1/go.mod h1:uc+V/WjzxQ7vpkxfJhgW4Q4axWXyfAerpQOuSNDZyFw=
 cloud.google.com/go/compute v1.19.1 h1:am86mquDUgjGNWxiGn+5PGLbmgiWXlE/yNWpIpNvuXY=
@@ -71,7 +69,6 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
 github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
-github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
 github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
 github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
 github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@@ -94,8 +91,6 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
 github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
-github.com/google/s2a-go v0.1.2 h1:WVtYAYuYxKeYajAmThMRYWP6K3wXkcqbGHeUgeubUHY=
-github.com/google/s2a-go v0.1.2/go.mod h1:OJpEgntRZo8ugHpF9hkoLJbS5dSI20XZeXJ9JVywLlM=
 github.com/google/s2a-go v0.1.3 h1:FAgZmpLl/SXurPEZyCMPBIiiYeTbqfjlbdnCNTAkbGE=
 github.com/google/s2a-go v0.1.3/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -121,26 +116,17 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/prometheus/client_golang v1.15.0 h1:5fCgGYogn0hFdhyhLbw7hEsWxufKtY9klyvdNfFlFhM=
-github.com/prometheus/client_golang v1.15.0/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
 github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI=
 github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
-github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
-github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
 github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
 github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
-github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM=
-github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
 github.com/prometheus/common v0.43.0 h1:iq+BVjvYLei5f27wiuNiB1DN6DYQkp1c8Bx0Vykh5us=
 github.com/prometheus/common v0.43.0/go.mod h1:NCvr5cQIh3Y/gy73/RdVtC9r8xxrxwJnB+2lB3BxrFc=
 github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
 github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
-github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
-github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
-github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -153,12 +139,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
-github.com/stripe/stripe-go/v74 v74.15.0 h1:P3ZYrY4CdZeV8Pc/205utqjur+5gcTef+9hgtj8P8IY=
-github.com/stripe/stripe-go/v74 v74.15.0/go.mod h1:f9L6LvaXa35ja7eyvP6GQswoaIPaBRvGAimAO+udbBw=
 github.com/stripe/stripe-go/v74 v74.17.0 h1:qVWSzmADr6gudznuAcPjB9ewzgxfyIhBCkyTbkxJcCw=
 github.com/stripe/stripe-go/v74 v74.17.0/go.mod h1:f9L6LvaXa35ja7eyvP6GQswoaIPaBRvGAimAO+udbBw=
-github.com/urfave/cli/v2 v2.25.1 h1:zw8dSP7ghX0Gmm8vugrs6q9Ku0wzweqPyshy+syu9Gw=
-github.com/urfave/cli/v2 v2.25.1/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
 github.com/urfave/cli/v2 v2.25.3 h1:VJkt6wvEBOoSjPFQvOkv6iWIrsJyCrKGtCtxXWwmGeY=
 github.com/urfave/cli/v2 v2.25.3/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
 github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
@@ -190,7 +172,6 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
 golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
@@ -203,10 +184,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
-golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
 golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -217,17 +195,12 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
-golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
 golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ=
-golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
 golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
 golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -252,8 +225,6 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
-google.golang.org/api v0.120.0 h1:TTmhTei0mkR+kiBSW2UzZmAbkTaBfUUzfchyXnzG9Hs=
-google.golang.org/api v0.120.0/go.mod h1:CrSvlNEFCFLae9ZUtL1z+61+rEBD7J/aCYwVYKZoWFU=
 google.golang.org/api v0.121.0 h1:8Oopoo8Vavxx6gt+sgs8s8/X60WBAtKQq6JqnkF+xow=
 google.golang.org/api v0.121.0/go.mod h1:gcitW0lvnyWjSp9nKxAbdHKIZ6vF4aajGueeslZOyms=
 google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
@@ -276,8 +247,6 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp
 google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
 google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
 google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
-google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
-google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
 google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
 google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

+ 28 - 1
server/server.go

@@ -82,6 +82,8 @@ var (
 	apiHealthPath                                        = "/v1/health"
 	apiStatsPath                                         = "/v1/stats"
 	apiTiersPath                                         = "/v1/tiers"
+	apiUserPath                                          = "/v1/user"
+	apiAccessPath                                        = "/v1/access"
 	apiAccountPath                                       = "/v1/account"
 	apiAccountTokenPath                                  = "/v1/account/token"
 	apiAccountPasswordPath                               = "/v1/account/password"
@@ -411,6 +413,10 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visit
 		return s.handleHealth(w, r, v)
 	} else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
 		return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
+	} else if r.Method == http.MethodPost && r.URL.Path == apiAccessPath {
+		return s.ensureAdmin(s.handleAccessAllow)(w, r, v)
+	} else if r.Method == http.MethodDelete && r.URL.Path == apiAccessPath {
+		return s.ensureAdmin(s.handleAccessReset)(w, r, v)
 	} else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
 		return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
 	} else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
@@ -1192,7 +1198,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
 	}
 	defer conn.Close()
 
-	// Subscription connections can be canceled externally, see topic.CancelSubscribers
+	// Subscription connections can be canceled externally, see topic.CancelSubscribersExceptUser
 	cancelCtx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
@@ -1434,6 +1440,7 @@ func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visito
 	return nil
 }
 
+// topicFromPath returns the topic from a root path (e.g. /mytopic), creating it if it doesn't exist.
 func (s *Server) topicFromPath(path string) (*topic, error) {
 	parts := strings.Split(path, "/")
 	if len(parts) < 2 {
@@ -1442,6 +1449,7 @@ func (s *Server) topicFromPath(path string) (*topic, error) {
 	return s.topicFromID(parts[1])
 }
 
+// topicFromID returns the topic from a root path (e.g. /mytopic,mytopic2), creating it if it doesn't exist.
 func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
 	parts := strings.Split(path, "/")
 	if len(parts) < 2 {
@@ -1455,6 +1463,7 @@ func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
 	return topics, parts[1], nil
 }
 
+// topicsFromIDs returns the topics with the given IDs, creating them if they don't exist.
 func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
@@ -1474,6 +1483,7 @@ func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
 	return topics, nil
 }
 
+// topicFromID returns the topic with the given ID, creating it if it doesn't exist.
 func (s *Server) topicFromID(id string) (*topic, error) {
 	topics, err := s.topicsFromIDs(id)
 	if err != nil {
@@ -1482,6 +1492,23 @@ func (s *Server) topicFromID(id string) (*topic, error) {
 	return topics[0], nil
 }
 
+// topicsFromPattern returns a list of topics matching the given pattern, but it does not create them.
+func (s *Server) topicsFromPattern(pattern string) ([]*topic, error) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	patternRegexp, err := regexp.Compile("^" + strings.ReplaceAll(pattern, "*", ".*") + "$")
+	if err != nil {
+		return nil, err
+	}
+	topics := make([]*topic, 0)
+	for _, t := range s.topics {
+		if patternRegexp.MatchString(t.ID) {
+			topics = append(topics, t)
+		}
+	}
+	return topics, nil
+}
+
 func (s *Server) runSMTPServer() error {
 	s.smtpServerBackend = newMailBackend(s.config, s.handle)
 	s.smtpServer = smtp.NewServer(s.smtpServerBackend)

+ 50 - 0
server/server_access.go

@@ -0,0 +1,50 @@
+package server
+
+import (
+	"heckel.io/ntfy/user"
+	"net/http"
+)
+
+func (s *Server) handleAccessAllow(w http.ResponseWriter, r *http.Request, v *visitor) error {
+	req, err := readJSONWithLimit[apiAccessAllowRequest](r.Body, jsonBodyBytesLimit, false)
+	if err != nil {
+		return err
+	}
+	permission, err := user.ParsePermission(req.Permission)
+	if err != nil {
+		return errHTTPBadRequestPermissionInvalid
+	}
+	if err := s.userManager.AllowAccess(req.Username, req.Topic, permission); err != nil {
+		return err
+	}
+	return s.writeJSON(w, newSuccessResponse())
+}
+
+func (s *Server) handleAccessReset(w http.ResponseWriter, r *http.Request, v *visitor) error {
+	req, err := readJSONWithLimit[apiAccessResetRequest](r.Body, jsonBodyBytesLimit, false)
+	if err != nil {
+		return err
+	}
+	u, err := s.userManager.User(req.Username)
+	if err != nil {
+		return err
+	}
+	if err := s.userManager.ResetAccess(req.Username, req.Topic); err != nil {
+		return err
+	}
+	if err := s.killUserSubscriber(u, req.Topic); err != nil { // This may be a pattern
+		return err
+	}
+	return s.writeJSON(w, newSuccessResponse())
+}
+
+func (s *Server) killUserSubscriber(u *user.User, topicPattern string) error {
+	topics, err := s.topicsFromPattern(topicPattern)
+	if err != nil {
+		return err
+	}
+	for _, t := range topics {
+		t.CancelSubscriberUser(u.ID)
+	}
+	return nil
+}

+ 100 - 0
server/server_access_test.go

@@ -0,0 +1,100 @@
+package server
+
+import (
+	"github.com/stretchr/testify/require"
+	"heckel.io/ntfy/user"
+	"heckel.io/ntfy/util"
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+func TestAccess_AllowReset(t *testing.T) {
+	c := newTestConfigWithAuthFile(t)
+	c.AuthDefault = user.PermissionDenyAll
+	s := newTestServer(t, c)
+	defer s.closeDatabases()
+
+	// User and admin
+	require.Nil(t, s.userManager.AddUser("phil", "phil", user.RoleAdmin))
+	require.Nil(t, s.userManager.AddUser("ben", "ben", user.RoleUser))
+
+	// Subscribing not allowed
+	rr := request(t, s, "GET", "/gold/json?poll=1", "", map[string]string{
+		"Authorization": util.BasicAuth("ben", "ben"),
+	})
+	require.Equal(t, 403, rr.Code)
+
+	// Grant access
+	rr = request(t, s, "POST", "/v1/access", `{"username": "ben", "topic":"gold", "permission":"ro"}`, map[string]string{
+		"Authorization": util.BasicAuth("phil", "phil"),
+	})
+	require.Equal(t, 200, rr.Code)
+
+	// Now subscribing is allowed
+	rr = request(t, s, "GET", "/gold/json?poll=1", "", map[string]string{
+		"Authorization": util.BasicAuth("ben", "ben"),
+	})
+	require.Equal(t, 200, rr.Code)
+
+	// Reset access
+	rr = request(t, s, "DELETE", "/v1/access", `{"username": "ben", "topic":"gold"}`, map[string]string{
+		"Authorization": util.BasicAuth("phil", "phil"),
+	})
+	require.Equal(t, 200, rr.Code)
+
+	// Subscribing not allowed (again)
+	rr = request(t, s, "GET", "/gold/json?poll=1", "", map[string]string{
+		"Authorization": util.BasicAuth("ben", "ben"),
+	})
+	require.Equal(t, 403, rr.Code)
+}
+
+func TestAccess_AllowReset_NonAdminAttempt(t *testing.T) {
+	c := newTestConfigWithAuthFile(t)
+	c.AuthDefault = user.PermissionDenyAll
+	s := newTestServer(t, c)
+	defer s.closeDatabases()
+
+	// User
+	require.Nil(t, s.userManager.AddUser("ben", "ben", user.RoleUser))
+
+	// Grant access fails, because non-admin
+	rr := request(t, s, "POST", "/v1/access", `{"username": "ben", "topic":"gold", "permission":"ro"}`, map[string]string{
+		"Authorization": util.BasicAuth("ben", "ben"),
+	})
+	require.Equal(t, 401, rr.Code)
+}
+
+func TestAccess_AllowReset_KillConnection(t *testing.T) {
+	c := newTestConfigWithAuthFile(t)
+	c.AuthDefault = user.PermissionDenyAll
+	s := newTestServer(t, c)
+	defer s.closeDatabases()
+
+	// User and admin, grant access to "gol*" topics
+	require.Nil(t, s.userManager.AddUser("phil", "phil", user.RoleAdmin))
+	require.Nil(t, s.userManager.AddUser("ben", "ben", user.RoleUser))
+	require.Nil(t, s.userManager.AllowAccess("ben", "gol*", user.PermissionRead)) // Wildcard!
+
+	start, timeTaken := time.Now(), atomic.Int64{}
+	go func() {
+		rr := request(t, s, "GET", "/gold/json", "", map[string]string{
+			"Authorization": util.BasicAuth("ben", "ben"),
+		})
+		require.Equal(t, 200, rr.Code)
+		timeTaken.Store(time.Since(start).Milliseconds())
+	}()
+	time.Sleep(500 * time.Millisecond)
+
+	// Reset access
+	rr := request(t, s, "DELETE", "/v1/access", `{"username": "ben", "topic":"gol*"}`, map[string]string{
+		"Authorization": util.BasicAuth("phil", "phil"),
+	})
+	require.Equal(t, 200, rr.Code)
+
+	// Wait for connection to be killed; this will fail if the connection is never killed
+	waitFor(t, func() bool {
+		return timeTaken.Load() >= 500
+	})
+}

+ 1 - 1
server/server_account.go

@@ -444,7 +444,7 @@ func (s *Server) handleAccountReservationAdd(w http.ResponseWriter, r *http.Requ
 	if err != nil {
 		return err
 	}
-	t.CancelSubscribers(u.ID)
+	t.CancelSubscribersExceptUser(u.ID)
 	return s.writeJSON(w, newSuccessResponse())
 }
 

+ 9 - 0
server/server_middleware.go

@@ -76,6 +76,15 @@ func (s *Server) ensureUser(next handleFunc) handleFunc {
 	})
 }
 
+func (s *Server) ensureAdmin(next handleFunc) handleFunc {
+	return s.ensureUserManager(func(w http.ResponseWriter, r *http.Request, v *visitor) error {
+		if !v.User().IsAdmin() {
+			return errHTTPUnauthorized
+		}
+		return next(w, r, v)
+	})
+}
+
 func (s *Server) ensurePaymentsEnabled(next handleFunc) handleFunc {
 	return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
 		if s.config.StripeSecretKey == "" || s.stripe == nil {

+ 26 - 10
server/topic.go

@@ -141,24 +141,40 @@ func (t *topic) Keepalive() {
 	t.lastAccess = time.Now()
 }
 
-// CancelSubscribers calls the cancel function for all subscribers, forcing
-func (t *topic) CancelSubscribers(exceptUserID string) {
+// CancelSubscribersExceptUser calls the cancel function for all subscribers, forcing
+func (t *topic) CancelSubscribersExceptUser(exceptUserID string) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	for _, s := range t.subscribers {
 		if s.userID != exceptUserID {
-			log.
-				Tag(tagSubscribe).
-				With(t).
-				Fields(log.Context{
-					"user_id": s.userID,
-				}).
-				Debug("Canceling subscriber %s", s.userID)
-			s.cancel()
+			t.cancelUserSubscriber(s)
 		}
 	}
 }
 
+// CancelSubscriberUser kills the subscriber with the given user ID
+func (t *topic) CancelSubscriberUser(userID string) {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	for _, s := range t.subscribers {
+		if s.userID == userID {
+			t.cancelUserSubscriber(s)
+			return
+		}
+	}
+}
+
+func (t *topic) cancelUserSubscriber(s *topicSubscriber) {
+	log.
+		Tag(tagSubscribe).
+		With(t).
+		Fields(log.Context{
+			"user_id": s.userID,
+		}).
+		Debug("Canceling subscriber with user ID %s", s.userID)
+	s.cancel()
+}
+
 func (t *topic) Context() log.Context {
 	t.mu.RLock()
 	defer t.mu.RUnlock()

+ 25 - 2
server/topic_test.go

@@ -9,7 +9,7 @@ import (
 	"github.com/stretchr/testify/require"
 )
 
-func TestTopic_CancelSubscribers(t *testing.T) {
+func TestTopic_CancelSubscribersExceptUser(t *testing.T) {
 	t.Parallel()
 
 	subFn := func(v *visitor, msg *message) error {
@@ -27,11 +27,34 @@ func TestTopic_CancelSubscribers(t *testing.T) {
 	to.Subscribe(subFn, "", cancelFn1)
 	to.Subscribe(subFn, "u_phil", cancelFn2)
 
-	to.CancelSubscribers("u_phil")
+	to.CancelSubscribersExceptUser("u_phil")
 	require.True(t, canceled1.Load())
 	require.False(t, canceled2.Load())
 }
 
+func TestTopic_CancelSubscribersUser(t *testing.T) {
+	t.Parallel()
+
+	subFn := func(v *visitor, msg *message) error {
+		return nil
+	}
+	canceled1 := atomic.Bool{}
+	cancelFn1 := func() {
+		canceled1.Store(true)
+	}
+	canceled2 := atomic.Bool{}
+	cancelFn2 := func() {
+		canceled2.Store(true)
+	}
+	to := newTopic("mytopic")
+	to.Subscribe(subFn, "u_another", cancelFn1)
+	to.Subscribe(subFn, "u_phil", cancelFn2)
+
+	to.CancelSubscriberUser("u_phil")
+	require.False(t, canceled1.Load())
+	require.True(t, canceled2.Load())
+}
+
 func TestTopic_Keepalive(t *testing.T) {
 	t.Parallel()
 

+ 11 - 0
server/types.go

@@ -244,6 +244,17 @@ type apiStatsResponse struct {
 	MessagesRate float64 `json:"messages_rate"` // Average number of messages per second
 }
 
+type apiAccessAllowRequest struct {
+	Username   string `json:"username"`
+	Topic      string `json:"topic"`
+	Permission string `json:"permission"`
+}
+
+type apiAccessResetRequest struct {
+	Username string `json:"username"`
+	Topic    string `json:"topic"`
+}
+
 type apiAccountCreateRequest struct {
 	Username string `json:"username"`
 	Password string `json:"password"`