Poller.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import api from "./Api";
  2. import prefs from "./Prefs";
  3. import subscriptionManager from "./SubscriptionManager";
  4. const delayMillis = 2000; // 2 seconds
  5. const intervalMillis = 300000; // 5 minutes
  6. class Poller {
  7. constructor() {
  8. this.timer = null;
  9. }
  10. startWorker() {
  11. if (this.timer !== null) {
  12. return;
  13. }
  14. console.log(`[Poller] Starting worker`);
  15. this.timer = setInterval(() => this.pollAll(), intervalMillis);
  16. setTimeout(() => this.pollAll(), delayMillis);
  17. }
  18. stopWorker() {
  19. clearTimeout(this.timer);
  20. }
  21. async pollAll() {
  22. console.log(`[Poller] Polling all subscriptions`);
  23. const subscriptions = await subscriptionManager.all();
  24. await Promise.all(
  25. subscriptions.map(async (s) => {
  26. try {
  27. await this.poll(s);
  28. } catch (e) {
  29. console.log(`[Poller] Error polling ${s.id}`, e);
  30. }
  31. })
  32. );
  33. }
  34. async poll(subscription) {
  35. console.log(`[Poller] Polling ${subscription.id}`);
  36. const since = subscription.last;
  37. const notifications = await api.poll(subscription.baseUrl, subscription.topic, since);
  38. // Filter out notifications older than the prune threshold
  39. const deleteAfterSeconds = await prefs.deleteAfter();
  40. const pruneThresholdTimestamp = deleteAfterSeconds > 0 ? Math.round(Date.now() / 1000) - deleteAfterSeconds : 0;
  41. const recentNotifications = pruneThresholdTimestamp > 0 ? notifications.filter((n) => n.time >= pruneThresholdTimestamp) : notifications;
  42. // Find the latest notification for each sequence ID
  43. const latestBySid = this.latestNotificationsBySid(recentNotifications);
  44. // Delete all existing notifications for which the latest notification is marked as deleted
  45. const deletedSids = Object.entries(latestBySid)
  46. .filter(([, notification]) => notification.deleted)
  47. .map(([sid]) => sid);
  48. if (deletedSids.length > 0) {
  49. console.log(`[Poller] Deleting notifications with deleted sequence IDs for ${subscription.id}`, deletedSids);
  50. await Promise.all(deletedSids.map((sid) => subscriptionManager.deleteNotificationBySid(subscription.id, sid)));
  51. }
  52. // Add only the latest notification for each non-deleted sequence
  53. const notificationsToAdd = Object.values(latestBySid).filter((n) => !n.deleted);
  54. if (notificationsToAdd.length > 0) {
  55. console.log(`[Poller] Adding ${notificationsToAdd.length} notification(s) for ${subscription.id}`);
  56. await subscriptionManager.addNotifications(subscription.id, notificationsToAdd);
  57. } else {
  58. console.log(`[Poller] No new notifications found for ${subscription.id}`);
  59. }
  60. }
  61. pollInBackground(subscription) {
  62. (async () => {
  63. try {
  64. await this.poll(subscription);
  65. } catch (e) {
  66. console.error(`[App] Error polling subscription ${subscription.id}`, e);
  67. }
  68. })();
  69. }
  70. /**
  71. * Groups notifications by sid and returns only the latest (highest time) for each sequence.
  72. * Returns an object mapping sid -> latest notification.
  73. */
  74. latestNotificationsBySid(notifications) {
  75. const latestBySid = {};
  76. notifications.forEach((notification) => {
  77. const sid = notification.sid || notification.id;
  78. if (!(sid in latestBySid) || notification.time >= latestBySid[sid].time) {
  79. latestBySid[sid] = notification;
  80. }
  81. });
  82. return latestBySid;
  83. }
  84. }
  85. const poller = new Poller();
  86. export default poller;