From 3924e9c4d73c04befeae3b235e2bc0ae60a6c317 Mon Sep 17 00:00:00 2001 From: David Beck Date: Sun, 31 Mar 2024 12:19:03 +0200 Subject: [PATCH] Implement Qt connect --- inc/finalmq/Qt/qt.h | 516 ++++++++++++++---- inc/finalmq/remoteentity/IRemoteEntity.h | 43 +- inc/finalmq/remoteentity/RemoteEntity.h | 6 + src/remoteentity/RemoteEntity.cpp | 63 ++- .../RemoteEntityFormatRegistry.cpp | 5 + 5 files changed, 484 insertions(+), 149 deletions(-) diff --git a/inc/finalmq/Qt/qt.h b/inc/finalmq/Qt/qt.h index ad834156..42d99a96 100644 --- a/inc/finalmq/Qt/qt.h +++ b/inc/finalmq/Qt/qt.h @@ -271,24 +271,103 @@ class FillObjectTree : public IObjectVisitor }; -//class NoFocusRectangleStyle : public QCommonStyle -//{ -//private: -// void drawPrimitive(PrimitiveElement element, -// const QStyleOption* option, -// QPainter* painter, -// const QWidget* widget) const -// { -// if (QStyle::PE_FrameFocusRect == element) -// { -// return; -// } -// else -// { -// QCommonStyle::drawPrimitive(element, option, painter, widget); -// } -// } -//}; +class ObjectHelper +{ +public: + static QObject* findObject(const QString& objectName) + { + QWidgetList widgetList = qApp->topLevelWidgets(); + for (int i = 0; i < widgetList.size(); ++i) + { + QObject* obj = widgetList[i]; + if (obj->objectName() == objectName) + { + return obj; + } + QList list = obj->findChildren(objectName, Qt::FindChildrenRecursively); + if (!list.isEmpty()) + { + return list[0]; + } + } + return nullptr; + } +}; + + +class ConnectObject : public QObject +{ +public: + ConnectObject(hybrid_ptr& remoteEntity, PeerId peerId, const std::string& objectName, const std::string& typeName, const QMetaMethod& metaMethod) + : m_remoteEntity(remoteEntity) + , m_peerId(peerId) + , m_typeName(typeName) + , m_path(objectName + "." + metaMethod.name().toStdString()) + , m_metaMethod(metaMethod) + { + } + virtual ~ConnectObject() + { + doDisconnect(); + } + void setConnection(const QMetaObject::Connection conn) + { + m_connection = conn; + } + bool isConnected() const + { + return m_connection; + } + void doDisconnect() + { + disconnect(m_connection); + } + +private: + virtual int qt_metacall(QMetaObject::Call /*call*/, int /*methodId*/, void** params) override + { + auto remoteEntity = m_remoteEntity.lock(); + if (remoteEntity) + { + QVariantList args; + const QList parameterTypes = m_metaMethod.parameterTypes(); + for (int i = 0; i < parameterTypes.size(); ++i) + { + int type = QMetaType::type(parameterTypes[i]); + args.append(QVariant(type, params[i + 1])); + } + + QByteArray bufferQt; + QDataStream streamParam(&bufferQt, QIODevice::WriteOnly); + streamParam << args; + + ZeroCopyBuffer bufferProto; + SerializerProto serializerProto(bufferProto); + ParserQt parserQt(serializerProto, bufferQt.data(), bufferQt.size(), ParserQt::Mode::WRAPPED_BY_QVARIANTLIST); + parserQt.parseStruct(m_typeName); + + GeneralMessage message; + + message.type = m_typeName; + bufferProto.copyData(message.data); + + remoteEntity->sendEvent(m_peerId, message); + } + else + { + doDisconnect(); + } + + return 0; + } + + const hybrid_ptr m_remoteEntity; + const PeerId m_peerId; + const std::string m_typeName; + const std::string m_path; + const const QMetaMethod m_metaMethod; + QMetaObject::Connection m_connection; +}; class QtInvoker : public RemoteEntity @@ -298,6 +377,18 @@ class QtInvoker : public RemoteEntity { init(); + // register peer events to see when a remote entity connects or disconnects. + registerPeerEvent([this](PeerId peerId, const SessionInfo& session, EntityId entityId, PeerEvent peerEvent, bool incoming) { + if (peerEvent == PeerEvent::PEER_DISCONNECTED) + { + auto itPeer = m_connectObjects.find(peerId); + if (itPeer != m_connectObjects.end()) + { + m_connectObjects.erase(itPeer); + } + } + }); + registerCommand("{objectid}/{method}", [this](const RequestContextPtr& requestContext, const std::shared_ptr& request) { bool found = false; @@ -315,73 +406,128 @@ class QtInvoker : public RemoteEntity QMetaMethod metaMethod; QMetaProperty metaProperty; bool propertySet = false; - const std::string& typeName = getTypeName(*objId, *methodName, obj, metaMethod, metaProperty, propertySet); + bool connect = false; + bool disconnect = false; + std::string connectTypeName; + const std::string& typeName = getTypeName(*objId, *methodName, obj, metaMethod, metaProperty, propertySet, connect, disconnect, connectTypeName); if ((obj != nullptr) && (typeName == request->type)) { if (metaMethod.isValid()) { - found = true; + if (connect) + { + found = true; + PeerId peerId = connectPeer(requestContext->session(), requestContext->getVirtualSessionId(), requestContext->entityId()); + + bool foundConnect = false; + auto itPeer = m_connectObjects.find(peerId); + if (itPeer != m_connectObjects.end()) + { + auto itSignalName = itPeer->second.find(metaMethod.name()); + if (itSignalName != itPeer->second.end()) + { + if (itSignalName->second->isConnected()) + { + foundConnect = true; + } + } + } + if (!foundConnect) + { + hybrid_ptr thisRemoteEntity = getWeakPtr(); + std::shared_ptr connectObject = std::make_shared(thisRemoteEntity, peerId, *objId, connectTypeName, metaMethod); + m_connectObjects[peerId][metaMethod.name()] = connectObject; - ZeroCopyBuffer buffer; - SerializerQt serializerQt(buffer, SerializerQt::Mode::WRAPPED_BY_QVARIANTLIST); - ParserProto parserProto(serializerQt, request->data.data(), request->data.size()); - parserProto.parseStruct(request->type); + QMetaObject::Connection conn = QMetaObject::connect(obj, metaMethod.methodIndex(), connectObject.get(), metaMethod.methodIndex(), Qt::AutoConnection); + connectObject->setConnection(conn); + } - QByteArray bufferByteArray; - bufferByteArray.reserve(static_cast(buffer.size())); - const std::list& chunks = buffer.chunks(); - for (const auto& chunk : chunks) + GeneralMessage replyMessage; + replyMessage.type = typeName; + requestContext->reply(replyMessage); + } + else if (disconnect) { - bufferByteArray.append(chunk.data(), static_cast(chunk.size())); + found = true; + PeerId peerId = requestContext->peerId(); + auto itPeer = m_connectObjects.find(peerId); + if (itPeer != m_connectObjects.end()) + { + auto itSignalName = itPeer->second.find(metaMethod.name()); + if (itSignalName != itPeer->second.end()) + { + itPeer->second.erase(itSignalName); + } + } + GeneralMessage replyMessage; + replyMessage.type = typeName; + requestContext->reply(replyMessage); } + else + { + found = true; - QVariantList parameters; - QDataStream streamInParams(bufferByteArray); - streamInParams >> parameters; + ZeroCopyBuffer buffer; + SerializerQt serializerQt(buffer, SerializerQt::Mode::WRAPPED_BY_QVARIANTLIST); + ParserProto parserProto(serializerQt, request->data.data(), request->data.size()); + parserProto.parseStruct(request->type); - std::array genericArguments; - for (int i = 0; i < parameters.length(); ++i) - { - const QVariant& parameter = parameters[i]; - genericArguments[i] = QGenericArgument(parameter.typeName(), parameter.constData()); - } + QByteArray bufferByteArray; + bufferByteArray.reserve(static_cast(buffer.size())); + const std::list& chunks = buffer.chunks(); + for (const auto& chunk : chunks) + { + bufferByteArray.append(chunk.data(), static_cast(chunk.size())); + } - QVariant returnValue(QMetaType::type(metaMethod.typeName()), - static_cast(NULL)); + QVariantList parameters; + QDataStream streamInParams(bufferByteArray); + streamInParams >> parameters; - QGenericReturnArgument returnArgument( - metaMethod.typeName(), - const_cast(returnValue.constData()) - ); - metaMethod.invoke(obj, returnArgument, - genericArguments[0], genericArguments[1], genericArguments[2], - genericArguments[3], genericArguments[4], genericArguments[5], - genericArguments[6], genericArguments[7], genericArguments[8], genericArguments[9]); + std::array genericArguments; + for (int i = 0; i < parameters.length(); ++i) + { + const QVariant& parameter = parameters[i]; + genericArguments[i] = QGenericArgument(parameter.typeName(), parameter.constData()); + } - QByteArray retQtBuffer; - QDataStream streamRetParam(&retQtBuffer, QIODevice::WriteOnly); - streamRetParam << returnValue; + QVariant returnValue(QMetaType::type(metaMethod.typeName()), + static_cast(NULL)); - std::string retTypeName = getReturnTypeName(metaMethod.typeName()); + QGenericReturnArgument returnArgument( + metaMethod.typeName(), + const_cast(returnValue.constData()) + ); + metaMethod.invoke(obj, returnArgument, + genericArguments[0], genericArguments[1], genericArguments[2], + genericArguments[3], genericArguments[4], genericArguments[5], + genericArguments[6], genericArguments[7], genericArguments[8], genericArguments[9]); - GeneralMessage replyMessage; - if (!retTypeName.empty()) - { - ZeroCopyBuffer bufferRet; - SerializerProto serializerProto(bufferRet); - ParserQt parserQt(serializerProto, retQtBuffer.data(), retQtBuffer.size(), ParserQt::Mode::WRAPPED_BY_QVARIANT); - parserQt.parseStruct(retTypeName); + QByteArray retQtBuffer; + QDataStream streamRetParam(&retQtBuffer, QIODevice::WriteOnly); + streamRetParam << returnValue; + std::string retTypeName = getReturnTypeName(metaMethod.typeName()); + + GeneralMessage replyMessage; + if (!retTypeName.empty()) + { + ZeroCopyBuffer bufferRet; + SerializerProto serializerProto(bufferRet); + ParserQt parserQt(serializerProto, retQtBuffer.data(), retQtBuffer.size(), ParserQt::Mode::WRAPPED_BY_QVARIANT); + parserQt.parseStruct(retTypeName); + + bufferRet.copyData(replyMessage.data); + } replyMessage.type = retTypeName; - bufferRet.copyData(replyMessage.data); + requestContext->reply(replyMessage); } - requestContext->reply(replyMessage); } else if (metaProperty.isValid()) { - found = true; if (propertySet) { + found = true; ZeroCopyBuffer buffer; SerializerQt serializerQt(buffer, SerializerQt::Mode::WRAPPED_BY_QVARIANT); ParserProto parserProto(serializerQt, request->data.data(), request->data.size()); @@ -403,6 +549,7 @@ class QtInvoker : public RemoteEntity if (result) { GeneralMessage replyMessage; + replyMessage.type = typeName; requestContext->reply(replyMessage); } else @@ -410,8 +557,65 @@ class QtInvoker : public RemoteEntity requestContext->reply(finalmq::Status::STATUS_REQUEST_PROCESSING_ERROR); } } + else if (connect) + { + if (metaProperty.hasNotifySignal()) + { + found = true; + metaMethod = metaProperty.notifySignal(); + PeerId peerId = connectPeer(requestContext->session(), requestContext->getVirtualSessionId(), requestContext->entityId()); + + bool foundConnect = false; + auto itPeer = m_connectObjects.find(peerId); + if (itPeer != m_connectObjects.end()) + { + auto itSignalName = itPeer->second.find(metaMethod.name()); + if (itSignalName != itPeer->second.end()) + { + if (itSignalName->second->isConnected()) + { + foundConnect = true; + } + } + } + if (!foundConnect) + { + hybrid_ptr thisRemoteEntity = getWeakPtr(); + std::shared_ptr connectObject = std::make_shared(thisRemoteEntity, peerId, *objId, connectTypeName, metaMethod); + m_connectObjects[peerId][metaMethod.name()] = connectObject; + + QMetaObject::Connection conn = QMetaObject::connect(obj, metaMethod.methodIndex(), connectObject.get(), metaMethod.methodIndex(), Qt::AutoConnection); + connectObject->setConnection(conn); + } + + GeneralMessage replyMessage; + replyMessage.type = typeName; + requestContext->reply(replyMessage); + } + } + else if (disconnect) + { + if (metaProperty.hasNotifySignal()) + { + found = true; + PeerId peerId = requestContext->peerId(); + auto itPeer = m_connectObjects.find(peerId); + if (itPeer != m_connectObjects.end()) + { + auto itSignalName = itPeer->second.find(metaMethod.name()); + if (itSignalName != itPeer->second.end()) + { + itPeer->second.erase(itSignalName); + } + } + GeneralMessage replyMessage; + replyMessage.type = typeName; + requestContext->reply(replyMessage); + } + } else { + found = true; QVariant value = metaProperty.read(obj); QByteArray retQtBuffer; @@ -555,7 +759,10 @@ class QtInvoker : public RemoteEntity QMetaMethod metaMethod; QMetaProperty metaProperty; bool propertySet = false; - return getTypeName(objId, methodName, obj, metaMethod, metaProperty, propertySet); + bool connect = false; + bool disconnect = false; + std::string connectTypeName; + return getTypeName(objId, methodName, obj, metaMethod, metaProperty, propertySet, connect, disconnect, connectTypeName); } std::string getReturnTypeName(const std::string& type) @@ -581,17 +788,23 @@ class QtInvoker : public RemoteEntity return typeName; } - std::string getTypeName(const std::string& objId, const std::string& methodName, QObject*& obj, QMetaMethod& metaMethod, QMetaProperty& metaProperty, bool& propertySet) + std::string getTypeName(const std::string& objId, const std::string& methodName, QObject*& obj, QMetaMethod& metaMethod, QMetaProperty& metaProperty, bool& propertySet, bool& connect, bool& disconnect, std::string& connectTypeName) { propertySet = false; + connect = false; + disconnect = false; + connectTypeName.clear(); + std::string typeOfGeneralMessage{}; - obj = findObject(QString::fromUtf8(objId.c_str())); + obj = ObjectHelper::findObject(QString::fromUtf8(objId.c_str())); if (obj) { const QMetaObject* metaObject = obj->metaObject(); if (metaObject) { - int ix = metaObject->indexOfMethod(methodName.c_str()); + int ix = -1; + + ix = metaObject->indexOfMethod(methodName.c_str()); if (ix != -1) { metaMethod = metaObject->method(ix); @@ -608,10 +821,20 @@ class QtInvoker : public RemoteEntity if (!metaMethod.isValid()) { - if (methodName.substr(0, 4) == "set_") + if (methodName.compare(0, 4, "set_") == 0) { propertySet = true; - ix = metaObject->indexOfProperty(methodName.substr(4).c_str()); + ix = metaObject->indexOfProperty(&methodName[4]); + } + if (methodName.compare(0, 8, "connect_") == 0) + { + connect = true; + ix = metaObject->indexOfProperty(&methodName[8]); + } + else if (methodName.compare(0, 11, "disconnect_") == 0) + { + disconnect = true; + ix = metaObject->indexOfProperty(&methodName[11]); } else { @@ -621,11 +844,54 @@ class QtInvoker : public RemoteEntity { metaProperty = metaObject->property(ix); } + else + { + propertySet = false; + connect = false; + disconnect = false; + } + } + + if (!metaMethod.isValid() && !metaProperty.isValid()) + { + if (methodName.compare(0, 8, "connect_") == 0) + { + connect = true; + ix = metaObject->indexOfSignal(&methodName[8]); + } + else if (methodName.compare(0, 11, "disconnect_") == 0) + { + disconnect = true; + ix = metaObject->indexOfSignal(&methodName[11]); + } + else + { + ix = metaObject->indexOfSignal(methodName.c_str()); + } + if (ix != -1) + { + metaMethod = metaObject->method(ix); + } + else + { + connect = false; + disconnect = false; + } } if (!metaMethod.isValid() && !metaProperty.isValid()) { - const QByteArray methodNameAsByteArray = methodName.c_str(); + QByteArray methodNameAsByteArray = methodName.c_str(); + if (methodName.compare(0, 8, "connect_") == 0) + { + connect = true; + methodNameAsByteArray = &methodName[8]; + } + else if (methodName.compare(0, 11, "disconnect_") == 0) + { + disconnect = true; + methodNameAsByteArray = &methodName[11]; + } for (int i = 0; i < metaObject->methodCount(); ++i) { QMetaMethod metaMethodTmp = metaObject->method(i); @@ -634,21 +900,37 @@ class QtInvoker : public RemoteEntity const QByteArray& name = metaMethodTmp.name(); if (name == methodNameAsByteArray) { - metaMethod = metaMethodTmp; - break; + if (connect || disconnect) + { + if (metaMethodTmp.methodType() == QMetaMethod::Signal) + { + metaMethod = metaMethodTmp; + break; + } + } + else + { + metaMethod = metaMethodTmp; + break; + } } } } + if (!metaMethod.isValid()) + { + connect = false; + disconnect = false; + } } if (metaMethod.isValid()) { + std::string typeName; QList argTypes = metaMethod.parameterTypes(); QList argNames = metaMethod.parameterNames(); if (argTypes.size() == argNames.size()) { - std::string typeName; for (int i = 0; i < argTypes.size(); ++i) { std::string name = parameterName(argNames[i].toStdString(), i); @@ -666,37 +948,49 @@ class QtInvoker : public RemoteEntity { typeName = '_'; } + } - bool ok = true; - const MetaStruct* struFound = MetaDataGlobal::instance().getStruct(typeName); - if (struFound == nullptr) + bool ok = true; + const MetaStruct* struFound = MetaDataGlobal::instance().getStruct(typeName); + if (struFound == nullptr) + { + MetaStruct stru{ typeName, "", {}, 0, {} }; + for (int i = 0; i < argTypes.size(); ++i) { - MetaStruct stru{ typeName, "", {}, 0, {} }; - for (int i = 0; i < argTypes.size(); ++i) + const std::string& type = argTypes[i].toStdString(); + const auto it = m_typesToField.find(type); + if (it != m_typesToField.end()) { - const std::string& type = argTypes[i].toStdString(); - const auto it = m_typesToField.find(type); - if (it != m_typesToField.end()) - { - std::string name = parameterName(argNames[i].toStdString(), i); - const MetaField& field = it->second; - stru.addField(MetaField(field.typeId, field.typeName, name, field.description, field.flags, field.attrs)); - } - else - { - ok = false; - break; - } + std::string name = parameterName(argNames[i].toStdString(), i); + const MetaField& field = it->second; + stru.addField(MetaField(field.typeId, field.typeName, name, field.description, field.flags, field.attrs)); } - if (ok) + else { - MetaDataGlobal::instance().addStruct(stru); + ok = false; + break; } } if (ok) { - typeOfGeneralMessage = typeName; + MetaDataGlobal::instance().addStruct(stru); + } + } + if (ok) + { + if (connect || disconnect) + { + connectTypeName = typeName; + typeName = '_'; + const MetaStruct* struFound = MetaDataGlobal::instance().getStruct(typeName); + if (struFound == nullptr) + { + MetaStruct stru{ typeName, "", {}, 0, {} }; + MetaDataGlobal::instance().addStruct(stru); + } } + + typeOfGeneralMessage = typeName; } } else if (metaProperty.isValid()) @@ -734,6 +1028,18 @@ class QtInvoker : public RemoteEntity } if (ok) { + if (connect || disconnect) + { + connectTypeName = typeName; + typeName = '_'; + const MetaStruct* struFound = MetaDataGlobal::instance().getStruct(typeName); + if (struFound == nullptr) + { + MetaStruct stru{ typeName, "", {}, 0, {} }; + MetaDataGlobal::instance().addStruct(stru); + } + } + typeOfGeneralMessage = typeName; } } @@ -742,27 +1048,9 @@ class QtInvoker : public RemoteEntity return typeOfGeneralMessage; } - static QObject* findObject(const QString& objectName) - { - QWidgetList widgetList = qApp->topLevelWidgets(); - for (int i = 0; i < widgetList.size(); ++i) - { - QObject* obj = widgetList[i]; - if (obj->objectName() == objectName) - { - return obj; - } - QList list = obj->findChildren(objectName, Qt::FindChildrenRecursively); - if (!list.isEmpty()) - { - return list[0]; - } - } - return nullptr; - } - - private: - std::unordered_map m_typesToField; +private: + std::unordered_map m_typesToField; + std::unordered_map>> m_connectObjects; }; diff --git a/inc/finalmq/remoteentity/IRemoteEntity.h b/inc/finalmq/remoteentity/IRemoteEntity.h index 2cbaa494..cc39ed9d 100644 --- a/inc/finalmq/remoteentity/IRemoteEntity.h +++ b/inc/finalmq/remoteentity/IRemoteEntity.h @@ -141,7 +141,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param path is the path that shall be called at the remote entity * @param structBase is the request message to send (generated code of fmq file). * @param funcReply is the reply callback. @@ -180,7 +180,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param path is the path that shall be called at the remote entity * @param metainfo is a key/value map of additional data besides the request data. Metainfo is very similar to HTTP headers. * @param structBase is the request message to send (generated code of fmq file). @@ -219,7 +219,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param structBase is the request message to send (generated code of fmq file). * @param funcReply is the reply callback. * @return if successful, valid correlation ID. @@ -256,7 +256,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param metainfo is a key/value map of additional data besides the request data. Metainfo is very similar to HTTP headers. * @param structBase is the request message to send (generated code of fmq file). * @param funcReply is the reply callback. @@ -292,7 +292,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param structBase is the event message to send (generated code of fmq file). */ virtual void sendEvent(const PeerId& peerId, const StructBase& structBase) = 0; @@ -304,7 +304,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param metainfo is a key/value map of additional data besides the request data. Metainfo is very similar to HTTP headers. * @param structBase is the event message to send (generated code of fmq file). */ @@ -315,7 +315,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param path is the path that shall be called at the remote entity * @param structBase is the event message to send (generated code of fmq file). */ @@ -328,7 +328,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param path is the path that shall be called at the remote entity * @param metainfo is a key/value map of additional data besides the request data. Metainfo is very similar to HTTP headers. * @param structBase is the event message to send (generated code of fmq file). @@ -533,7 +533,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param path is the path that shall be called at the remote entity * @param structBase is the request message to send (generated code of fmq file). * @param correlationId is an ID that can be matched at the callback, which is registered @@ -548,7 +548,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param path is the path that shall be called at the remote entity * @param structBase is the request message to send (generated code of fmq file). * @param funcReply is the reply callback. @@ -564,7 +564,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param path is the path that shall be called at the remote entity * @param metainfo is a key/value map of additional data besides the request data. Metainfo is very similar to HTTP headers. * @param structBase is the request message to send (generated code of fmq file). @@ -579,7 +579,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param structBase is the request message to send (generated code of fmq file). * @param funcReply is the reply callback. * @return if successful, valid correlation ID. @@ -594,7 +594,7 @@ struct IRemoteEntity * @param peerId is the id of the peer. You can get it when you connect() to a peer, when you * call getAllPeers(), inside a peer event or by calling requestContext->peerId() inside a * command execution. The peerId belongs to this entity. Because an entity can have multiple - * connections to remote entities, a remote entity must be identified be the peerId. + * connections to remote entities, a remote entity must be identified by the peerId. * @param metainfo is a key/value map of additional data besides the request data. Metainfo is very similar to HTTP headers. * @param structBase is the request message to send (generated code of fmq file). * @param funcReply is the reply callback. @@ -621,6 +621,7 @@ struct IRemoteEntity /** * @brief creates a peer at the own entity, so that the peer will publish events to the session. * This can be used e.g. for mqtt sessions. + * @param session is the session that is used for this entity-to-entity connection. * @param entityName is the name of the destination. The mqtt topic will look like this: "//". * @param triggerPeerEvent will cause a peer event, if true * @return the created peer id. @@ -635,6 +636,22 @@ struct IRemoteEntity */ virtual std::string getTypeOfGeneralMessage(const std::string& path) = 0; + /** + * @brief at a request, you can connect the peer. the peer will be connected (created) in case the peer was not already connected. + * @param session is the session that is used for this entity-to-entity connection. + * @param virtualSessionId the virtual session ID + * @param entityId is the ID of the remote entity to which this entity shall be connected. + * @return the (created) peer id. + */ + virtual PeerId connectPeer(const SessionInfo& session, const std::string& virtualSessionId, EntityId entityId) = 0; + + /** + * @brief removes a peer in case it exists + * @param peerId is the id of the peer. You can get it when you connect() to a peer. + */ + virtual void disconnectPeer(PeerId peerId) = 0; + + private: // methods for RemoteEntityContainer virtual void sessionDisconnected(const IProtocolSessionPtr& session) = 0; diff --git a/inc/finalmq/remoteentity/RemoteEntity.h b/inc/finalmq/remoteentity/RemoteEntity.h index b29515b4..536ad3bd 100644 --- a/inc/finalmq/remoteentity/RemoteEntity.h +++ b/inc/finalmq/remoteentity/RemoteEntity.h @@ -295,6 +295,8 @@ class SYMBOLEXP RemoteEntity : public IRemoteEntity, private std::enable_shared_ virtual void registerReplyEvent(FuncReplyEvent funcReplyEvent) override; virtual PeerId createPublishPeer(const SessionInfo& session, const std::string& entityName, bool triggerPeerEvent = true) override; virtual std::string getTypeOfGeneralMessage(const std::string& path) override; + virtual PeerId connectPeer(const SessionInfo& session, const std::string& virtualSessionId, EntityId entityId) override; + virtual void disconnectPeer(PeerId peerId) override; private: virtual void sessionDisconnected(const IProtocolSessionPtr& session) override; @@ -303,6 +305,10 @@ class SYMBOLEXP RemoteEntity : public IRemoteEntity, private std::enable_shared_ virtual void receivedReply(const ReceiveData& receiveData) override; virtual void deinit() override; +protected: + hybrid_ptr getWeakPtr(); + +private: PeerId connectIntern(const SessionInfo& session, const std::string& virtualSessionId, const std::string& entityName, EntityId, const std::shared_ptr& funcReplyConnect); void connectIntern(PeerId peerId, const SessionInfo& session, const std::string& entityName, EntityId entityId); void removePeer(PeerId peerId, Status status); diff --git a/src/remoteentity/RemoteEntity.cpp b/src/remoteentity/RemoteEntity.cpp index 4eb48ed6..05f04914 100644 --- a/src/remoteentity/RemoteEntity.cpp +++ b/src/remoteentity/RemoteEntity.cpp @@ -717,6 +717,45 @@ std::string RemoteEntity::getTypeOfGeneralMessage(const std::string& /*path*/) return {}; } +PeerId RemoteEntity::connectPeer(const SessionInfo& session, const std::string& virtualSessionId, EntityId entityId) +{ + static std::string STR_DUMMY = ""; + bool added{}; + std::uint64_t srcid = entityId; + //if (srcid == 0) + //{ + // srcid = ENTITYID_INVALID; + //} + PeerId peerId = m_peerManager->addPeer(session, virtualSessionId, srcid, STR_DUMMY, true, added, nullptr); + return peerId; +} + +void RemoteEntity::disconnectPeer(PeerId peerId) +{ + removePeer(peerId, Status::STATUS_PEER_DISCONNECTED); +} + + +hybrid_ptr RemoteEntity::getWeakPtr() +{ +#ifdef FINALMQ_HAS_NOT_WEAK_FROM_THIS + std::weak_ptr thisEntityWeak = *reinterpret_cast*>(static_cast*>(this)); +#else + std::weak_ptr thisEntityWeak = weak_from_this(); +#endif + hybrid_ptr thisEntity; + if (thisEntityWeak.lock()) + { + thisEntity = thisEntityWeak; + } + else + { + thisEntity = this; + } + + return thisEntity; +} + void RemoteEntity::sendConnectEntity(PeerId peerId, IRemoteEntityContainer& entityContainer, const std::shared_ptr& funcReplyConnect) { @@ -726,20 +765,7 @@ void RemoteEntity::sendConnectEntity(PeerId peerId, IRemoteEntityContainer& enti if (!registered) { // entity not registered, yet - hybrid_ptr thisEntity; -#ifdef FINALMQ_HAS_NOT_WEAK_FROM_THIS - std::weak_ptr thisEntityWeak = *reinterpret_cast*>(static_cast*>(this)); -#else - std::weak_ptr thisEntityWeak = weak_from_this(); -#endif - if (thisEntityWeak.lock()) - { - thisEntity = thisEntityWeak; - } - else - { - thisEntity = this; - } + hybrid_ptr thisEntity = getWeakPtr(); entityContainer.registerEntity(thisEntity); } @@ -1122,14 +1148,7 @@ void RemoteEntity::receivedRequest(ReceiveData& receiveData) { if (receiveData.automaticConnect) { - static std::string STR_DUMMY = "dummy"; - bool added{}; - std::uint64_t srcid = receiveData.header.srcid; - if (srcid == 0) - { - srcid = ENTITYID_INVALID; - } - m_peerManager->addPeer(receiveData.session, receiveData.virtualSessionId, srcid, STR_DUMMY, true, added, nullptr); + connectPeer(receiveData.session, receiveData.virtualSessionId, receiveData.header.srcid); } std::shared_ptr func; diff --git a/src/remoteentity/RemoteEntityFormatRegistry.cpp b/src/remoteentity/RemoteEntityFormatRegistry.cpp index 528f9a7d..d3f13b5e 100644 --- a/src/remoteentity/RemoteEntityFormatRegistry.cpp +++ b/src/remoteentity/RemoteEntityFormatRegistry.cpp @@ -224,6 +224,11 @@ void RemoteEntityFormatRegistryImpl::send(const IProtocolSessionPtr& session, co assert(session); if (shallSend(header, session)) { + if (structBase && (header.type == GeneralMessage::structInfo().getTypeName())) + { + const GeneralMessage* generalMessage = static_cast(structBase); + header.type = generalMessage->type; + } const finalmq::Bytes* pureData = nullptr; IMessagePtr message = session->createMessage(); assert(message);