ConnectionManager.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import Connection from "./Connection";
  2. import {sha256} from "./utils";
  3. class ConnectionManager {
  4. constructor() {
  5. this.connections = new Map(); // ConnectionId -> Connection (hash, see below)
  6. this.stateListener = null; // Fired when connection state changes
  7. this.notificationListener = null; // Fired when new notifications arrive
  8. }
  9. registerStateListener(listener) {
  10. this.stateListener = listener;
  11. }
  12. resetStateListener() {
  13. this.stateListener = null;
  14. }
  15. registerNotificationListener(listener) {
  16. this.notificationListener = listener;
  17. }
  18. resetNotificationListener() {
  19. this.notificationListener = null;
  20. }
  21. /**
  22. * This function figures out which websocket connections should be running by comparing the
  23. * current state of the world (connections) with the target state (targetIds).
  24. *
  25. * It uses a "connectionId", which is sha256($subscriptionId|$username|$password) to identify
  26. * connections. If any of them change, the connection is closed/replaced.
  27. */
  28. async refresh(subscriptions, users) {
  29. if (!subscriptions || !users) {
  30. return;
  31. }
  32. console.log(`[ConnectionManager] Refreshing connections`);
  33. const subscriptionsWithUsersAndConnectionId = await Promise.all(subscriptions
  34. .map(async s => {
  35. const [user] = users.filter(u => u.baseUrl === s.baseUrl);
  36. const connectionId = await makeConnectionId(s, user);
  37. return {...s, user, connectionId};
  38. }));
  39. const targetIds = subscriptionsWithUsersAndConnectionId.map(s => s.connectionId);
  40. const deletedIds = Array.from(this.connections.keys()).filter(id => !targetIds.includes(id));
  41. // Create and add new connections
  42. subscriptionsWithUsersAndConnectionId.forEach(subscription => {
  43. const subscriptionId = subscription.id;
  44. const connectionId = subscription.connectionId;
  45. const added = !this.connections.get(connectionId)
  46. if (added) {
  47. const baseUrl = subscription.baseUrl;
  48. const topic = subscription.topic;
  49. const user = subscription.user;
  50. const since = subscription.last;
  51. const connection = new Connection(
  52. connectionId,
  53. subscriptionId,
  54. baseUrl,
  55. topic,
  56. user,
  57. since,
  58. (subscriptionId, notification) => this.notificationReceived(subscriptionId, notification),
  59. (subscriptionId, state) => this.stateChanged(subscriptionId, state)
  60. );
  61. this.connections.set(connectionId, connection);
  62. console.log(`[ConnectionManager] Starting new connection ${connectionId} (subscription ${subscriptionId} with user ${user ? user.username : "anonymous"})`);
  63. connection.start();
  64. }
  65. });
  66. // Delete old connections
  67. deletedIds.forEach(id => {
  68. console.log(`[ConnectionManager] Closing connection ${id}`);
  69. const connection = this.connections.get(id);
  70. this.connections.delete(id);
  71. connection.close();
  72. });
  73. }
  74. stateChanged(subscriptionId, state) {
  75. if (this.stateListener) {
  76. try {
  77. this.stateListener(subscriptionId, state);
  78. } catch (e) {
  79. console.error(`[ConnectionManager] Error updating state of ${subscriptionId} to ${state}`, e);
  80. }
  81. }
  82. }
  83. notificationReceived(subscriptionId, notification) {
  84. if (this.notificationListener) {
  85. try {
  86. this.notificationListener(subscriptionId, notification);
  87. } catch (e) {
  88. console.error(`[ConnectionManager] Error handling notification for ${subscriptionId}`, e);
  89. }
  90. }
  91. }
  92. }
  93. const makeConnectionId = async (subscription, user) => {
  94. const hash = (user)
  95. ? await sha256(`${subscription.id}|${user.username}|${user.password}`)
  96. : await sha256(`${subscription.id}`);
  97. return hash.substring(0, 10);
  98. }
  99. const connectionManager = new ConnectionManager();
  100. export default connectionManager;