Skip to content

Commit

Permalink
init populate queue pkg 1#
Browse files Browse the repository at this point in the history
- add to middleware
  • Loading branch information
albertleigh committed Mar 12, 2019
1 parent 5755afe commit f46d562
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 6 deletions.
4 changes: 3 additions & 1 deletion src/event/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import actionsGenerator from './actionsGenerator';

export const actionsDict = actionsGenerator();

export default {
actionsDict:actionsGenerator(),
actionsDict,
}
13 changes: 13 additions & 0 deletions src/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ import {

import init from './init';

// queue
import {
SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION,
CONSUME_FROM_QUEUE_OF_ONE_SESSION
} from './queue/actions.constant';
import {
sendOneTxtMsgToQueueOfOneSessionHanlder,
consumeFromQueueOfOneSessionHanlder,
} from './queue/handlers';

// request
import {
SEND_ONE_TXT_MSG_REQUEST_OF_ONE_SESSION,
Expand All @@ -52,6 +62,9 @@ const actionHandlers = {
[PUBLISH_ONE_TXT_MSG_TO_ONE_SESSION]:publishOneTxtMsgToOneSessionHanlder,
[SUBSCRIBE_ONE_TOPIC_OF_ONE_SESSION]:subscribeOneTopicOfOneSessionHanlder,
[UNSUBSCRIBE_ONE_TOPIC_OF_ONE_SESSION]:unsubscribeOneTopicOfOneSessionHanlder,
// queue
[SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION]:sendOneTxtMsgToQueueOfOneSessionHanlder,
[CONSUME_FROM_QUEUE_OF_ONE_SESSION]:consumeFromQueueOfOneSessionHanlder,
// request
[SEND_ONE_TXT_MSG_REQUEST_OF_ONE_SESSION]:sendOneTxtMsgRequestOfOneSessionHanlder,
[REPLY_ONE_MSG_VIA_TXT_OF_ONE_SESSION]:replyOneMsgViaTxtOfOneSessionHanlder,
Expand Down
2 changes: 1 addition & 1 deletion src/publishSubscribe/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export async function unsubscribeOneTopicOfOneSession(action:Action<types.IUnsub
}catch (e) {
console.error(e.message);
dispatchAction(handlerActions.unsubscribeOneTopicOfOneSessionRes({
name:SUBSCRIBE_ONE_TOPIC_OF_ONE_SESSION_ERR_MSG,
name:UNSUBSCRIBE_ONE_TOPIC_OF_ONE_SESSION_ERR_MSG,
error:e,
}));
throw e;
Expand Down
2 changes: 0 additions & 2 deletions src/publishSubscribe/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { BaseRequestPayload, BaseResponsePayload } from '../base/BasePayload';

import { ISessionContextConfig, ISolaceContextPayload, SessionContext } from '../utils/SolaceContext';

export interface IPublishOneTxtMsgToOneSessionPayload extends BaseRequestPayload{
sessionId:string,
topicName:string,
Expand Down
7 changes: 7 additions & 0 deletions src/queue/actions.constant.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import makeType from '../utils/makeType';

export const SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION = makeType('SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION');
export const SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION_RES = makeType('SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION_RES');

export const CONSUME_FROM_QUEUE_OF_ONE_SESSION = makeType('CONSUME_FROM_QUEUE_OF_ONE_SESSION');
export const CONSUME_FROM_QUEUE_OF_ONE_SESSION_RES = makeType('CONSUME_FROM_QUEUE_OF_ONE_SESSION_RES');
18 changes: 18 additions & 0 deletions src/queue/actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Action } from 'redux-actions';
import createFSA, { ActionCreator } from '../utils/createFSA';

import * as types from './types';
import * as actionTypes from './actions.constant';

export const sendOneTxtMsgToQueueOfOneSession:ActionCreator<types.ISendOneTxtMsgToQueueOfOneSessionPayload> =
createFSA<types.ISendOneTxtMsgToQueueOfOneSessionPayload>(
actionTypes.SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION,
(options:types.ISendOneTxtMsgToQueueOfOneSessionPayload) => <any> options
);


export const consumeFromQueueOfOneSession:ActionCreator<types.IConsumeFromQueueOfOneSessionPayload> =
createFSA<types.IConsumeFromQueueOfOneSessionPayload>(
actionTypes.CONSUME_FROM_QUEUE_OF_ONE_SESSION,
(options:types.IConsumeFromQueueOfOneSessionPayload) => <any> options
);
99 changes: 99 additions & 0 deletions src/queue/async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import {Action} from 'redux-actions';

import { initState, dispatchAction } from '../init';
import msgBuilder, {IMsgBuilt} from '../utils/msgBuilder';

import { actionsDict as EventActions } from '../event'

import * as types from "./types";
import * as handlerActions from './handlerActions';

const SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION_ERR_MSG='[redux-solace] Failed to send one txt msg to queue of one session';
const CONSUME_FROM_QUEUE_OF_ONE_SESSION_ERR_MSG='[redux-solace] Failed to consume from queue of one session';


export async function sendOneTxtMsgToQueueOfOneSession(action:Action<types.ISendOneTxtMsgToQueueOfOneSessionPayload>)
:Promise<Action<types.ISendOneTxtMsgToQueueOfOneSessionResPayload>>
{
const {
sessionId, queueName, msgTxt, userDataStr, userPropertyMap, correlationKey
} = action.payload;

try{
initState.solaceContext.sendOneTxtMsgToQueueOfOneSession(
sessionId, queueName, msgTxt, userDataStr, userPropertyMap, correlationKey,
);
const responoseAction = handlerActions.sendOneTxtMsgToQueueOfOneSessionRes({
sessionId, queueName, msgTxt, userDataStr, userPropertyMap, correlationKey,
});
dispatchAction(responoseAction);
return responoseAction;
}catch (e) {
console.error(e.message);
dispatchAction(handlerActions.sendOneTxtMsgToQueueOfOneSessionRes({
name:SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION_ERR_MSG,
error:e,
}));
throw e;
}

}

export async function consumeFromQueueOfOneSession(action:Action<types.IConsumeFromQueueOfOneSessionPayload>)
:Promise<Action<types.IConsumeFromQueueOfOneSessionResPayload>>
{
const {
sessionId, queueName, autoAcknowledge, otherCallbackDict,
} = action.payload;

const onMsgCb = (sessionEvent)=>{
const result = msgBuilder(sessionEvent);
dispatchAction(
EventActions[initState.solace.MessageConsumerEventName.MESSAGE]
.action({
sessionId, queueName,
...result,
})
)
};

const wrappedCbDict:{[key:number]:Function,[key:string]:Function} ={};

initState.solaceContext.messageConsumerEventCodes.forEach((oneEventCode)=>{
if (oneEventCode !== initState.solace.MessageConsumerEventName.MESSAGE){
if (otherCallbackDict && otherCallbackDict[oneEventCode]){
wrappedCbDict[oneEventCode] = function () {
dispatchAction(EventActions[oneEventCode].action({
sessionId, queueName, arguments
}));
return otherCallbackDict[oneEventCode]();
}
}else{
wrappedCbDict[oneEventCode] = function () {
dispatchAction(EventActions[oneEventCode].action({
sessionId, queueName, arguments
}));
}
}
}
});

try{
initState.solaceContext.consumeFromQueueOfOneSession(
sessionId, queueName, onMsgCb, autoAcknowledge, wrappedCbDict,
);
const responoseAction = handlerActions.consumeFromQueueOfOneSessionRes({
sessionId, queueName
});
dispatchAction(responoseAction);
return responoseAction;
}catch (e) {
console.error(e.message);
dispatchAction(handlerActions.consumeFromQueueOfOneSessionRes({
name:CONSUME_FROM_QUEUE_OF_ONE_SESSION_ERR_MSG,
error:e,
}));
throw e;
}

}
18 changes: 18 additions & 0 deletions src/queue/handlerActions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Action } from 'redux-actions';
import createFSA, { ActionCreator } from '../utils/createFSA';

import * as types from './types';
import * as actionTypes from './actions.constant';

export const sendOneTxtMsgToQueueOfOneSessionRes:ActionCreator<types.ISendOneTxtMsgToQueueOfOneSessionResPayload> =
createFSA<types.ISendOneTxtMsgToQueueOfOneSessionResPayload>(
actionTypes.SEND_ONE_TXT_MSG_TO_QUEUE_OF_ONE_SESSION_RES,
(options:types.ISendOneTxtMsgToQueueOfOneSessionResPayload) => <any> options
);


export const consumeFromQueueOfOneSessionRes:ActionCreator<types.IConsumeFromQueueOfOneSessionResPayload> =
createFSA<types.IConsumeFromQueueOfOneSessionResPayload>(
actionTypes.CONSUME_FROM_QUEUE_OF_ONE_SESSION_RES,
(options:types.IConsumeFromQueueOfOneSessionResPayload) => <any> options
);
16 changes: 16 additions & 0 deletions src/queue/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import {ActionHandlerParams} from "../GlobalTypes";
import * as asyncs from './async';

export const sendOneTxtMsgToQueueOfOneSessionHanlder = (params:ActionHandlerParams)=>{
const { action } = params;
asyncs.sendOneTxtMsgToQueueOfOneSession(action).catch( e=>{
// eat the exception
})
}

export const consumeFromQueueOfOneSessionHanlder = (params:ActionHandlerParams)=>{
const { action } = params;
asyncs.consumeFromQueueOfOneSession(action).catch( e=>{
// eat the exception
})
}
29 changes: 29 additions & 0 deletions src/queue/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { BaseRequestPayload, BaseResponsePayload } from '../base/BasePayload';

export interface ISendOneTxtMsgToQueueOfOneSessionPayload extends BaseRequestPayload{
sessionId:string,
queueName:string,
msgTxt:string,
userDataStr?:string,
userPropertyMap?:any,
correlationKey?:any,
}
export interface ISendOneTxtMsgToQueueOfOneSessionResPayload extends BaseResponsePayload{
sessionId?:string,
queueName?:string,
msgTxt?:string,
userDataStr?:string,
userPropertyMap?:any,
correlationKey?:any,
}

export interface IConsumeFromQueueOfOneSessionPayload extends BaseRequestPayload{
sessionId:string,
queueName:string,
autoAcknowledge?:boolean,
otherCallbackDict?:{[key:number]:Function,[key:string]:Function},
}
export interface IConsumeFromQueueOfOneSessionResPayload extends BaseResponsePayload{
sessionId?:string,
queueName?:string,
}
4 changes: 2 additions & 2 deletions src/utils/SolaceContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ export default class SolaceContext{
// queue and confirmed delivery

sendOneTxtMsgToQueueOfOneSession = (
sessionId:string, queueName:string, msgTxt:string, userDataStr:string, userPropertyMap:any, correlationKey:any=null
sessionId:string, queueName:string, msgTxt:string, userDataStr:string="", userPropertyMap:any={}, correlationKey:any=null
) =>{
if (!!this.sessionContextDict[sessionId]){
const context = this.sessionContextDict[sessionId];
Expand Down Expand Up @@ -441,7 +441,7 @@ export default class SolaceContext{
};

consumeFromQueueOfOneSession = (
sessionId:string, queueName:string, onMsgCb:Function,
sessionId:string, queueName:string, onMsgCb:(sessionEvent:any)=>void,
autoAck:boolean=true, otherCbDict:{[key:number]:Function,[key:string]:Function} = {}
) =>{
if(!!this.sessionContextDict[sessionId]){
Expand Down

0 comments on commit f46d562

Please sign in to comment.