-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Read only
- Loading branch information
Showing
35 changed files
with
897 additions
and
62 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import { ObjectId } from 'bson'; | ||
import UserNotification from '../../../shared/user-notification'; | ||
import sendEvent from '../../kafka/events/producer'; | ||
import KafkaMessage from '../../kafka/kafka-message'; | ||
import logger from '../logger'; | ||
/** | ||
* A user notification is associated with an activity | ||
* | ||
* | ||
* Should have: | ||
* - activity: Object notification activity | ||
* - user: Actor notification user target | ||
* | ||
* may have: | ||
* - emailId(String) and email(EmailHead) | ||
* | ||
*/ | ||
|
||
const debug = logger.extend('commands:user-notification'); | ||
|
||
export async function createUserNotification(notification, sender) { | ||
let userNotification; | ||
const _id = new ObjectId(); | ||
try { | ||
userNotification = UserNotification.fromObject({ ...notification, _id }); | ||
} catch (e) { | ||
debug('Bad parameter: %O', e.message); | ||
throw e; | ||
} | ||
|
||
const kafkaMessage = KafkaMessage.fromObject(userNotification.user._id, { | ||
event: 'user:notification:create', | ||
sender, | ||
payload: userNotification, | ||
}); | ||
|
||
await sendEvent(kafkaMessage); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import logger from '../logger'; | ||
import UserNotification from '../../../shared/user-notification'; | ||
import { dbCol } from '../../mongodb'; | ||
import { ObjectId } from 'bson'; | ||
import sendNotification from '../../kafka/notifications/producer'; | ||
|
||
const debug = logger.extend('events:user-notifications'); | ||
|
||
export async function userNotificationCreateReceiver(kafkaMessage) { | ||
let userNotification; | ||
try { | ||
userNotification = UserNotification.fromObject(kafkaMessage.payload()); | ||
} catch (e) { | ||
debug('Invalid payload %s: %O', e.message, kafkaMessage.payload()); | ||
} | ||
|
||
try { | ||
const collection = await dbCol('userNotifications'); | ||
const { insertedCount } = await collection.insertOne({ ...userNotification, _id: new ObjectId(userNotification._id) }); | ||
if (insertedCount !== 1) { | ||
throw new Error('No document have been inserted in the datastore.'); | ||
} | ||
} catch (e) { | ||
debug('Can not insert new user notification: %s', e.message); | ||
return; | ||
} | ||
|
||
const notification = kafkaMessage.setEvent('user:notification:created'); | ||
|
||
await sendNotification(notification); | ||
} | ||
|
||
export const EVENTS = { | ||
'user:notification:create': userNotificationCreateReceiver, | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import { writable } from 'svelte/store'; | ||
import { get } from 'api'; | ||
import { registerEvent } from './sse'; | ||
import { isAfter, isBefore } from 'date-fns'; | ||
import EmailHead from '../shared/email-head'; | ||
|
||
export const unreadNotifications = writable([]); | ||
|
||
export const start = () => { | ||
const unregister = registerEvent('user:notification:created', (payload) => { | ||
updateStore([payload]); | ||
}); | ||
loadFromServer(); | ||
}; | ||
|
||
const loadFromServer = async () => { | ||
try { | ||
const unreads = await get('/api/notifications'); | ||
updateStore(unreads); | ||
} catch (e) { | ||
console.log('Error loading notifications from store:', e.message, e); | ||
throw e; | ||
} | ||
}; | ||
|
||
const updateStore = (notifications) => { | ||
const toUpdate = notifications.filter(n => !n.seen); | ||
if (!toUpdate.length) { | ||
return; | ||
} | ||
unreadNotifications.update((notifs) => { | ||
const newStoreContents = [...notifs]; | ||
|
||
toUpdate.forEach(n => { | ||
const hydratedNotif = hydrateNotification(n); | ||
const index = newStoreContents.findIndex(notification => hydratedNotif._id === notification._id); | ||
if (index >= 0) { | ||
newStoreContents.splice(index, 1, hydratedNotif); | ||
} else { | ||
const previousNotifIndex = newStoreContents.findIndex(notification => { | ||
console.log(new Date(hydratedNotif.activity.date), new Date(notification.activity.date), isAfter(new Date(hydratedNotif.activity.date), new Date(notification.activity.date))); | ||
return isAfter(new Date(hydratedNotif.activity.date), new Date(notification.activity.date)); | ||
}); | ||
if (previousNotifIndex < 0) { | ||
newStoreContents.push(hydratedNotif); | ||
} else { | ||
newStoreContents.splice(previousNotifIndex, 0, hydratedNotif); | ||
} | ||
} | ||
}); | ||
|
||
return newStoreContents; | ||
}); | ||
}; | ||
|
||
const hydrateNotification = (n) => { | ||
const hydratedNotification = { ...n }; | ||
if (hydratedNotification.email) { | ||
hydratedNotification.email = EmailHead.fromObject(n.email); | ||
} | ||
return hydratedNotification; | ||
}; |
Oops, something went wrong.