Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Get/Put links #2316

Closed
3 changes: 3 additions & 0 deletions opencog/atoms/atom_types/atom_types.script
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ FLOAT_VALUE <- VALUE // vector of floats, actually.
STRING_VALUE <- VALUE // vector of strings
LINK_VALUE <- VALUE // vector of values ("link" holding values)
VALUATION <- VALUE // (atom,key,value) triple
QUEUE_VALUE <- VALUE // queue value to pass results from one atom to another

// ===========================================================
// TruthValues are the subobject classifiers for Atomese; they are used
Expand Down Expand Up @@ -388,6 +389,7 @@ SATISFYING_LINK <- PATTERN_LINK
GET_LINK <- SATISFYING_LINK // Finds all groundings, returns them
QUERY_LINK <- SATISFYING_LINK // Finds all groundings, substitutes.
BIND_LINK <- QUERY_LINK // Finds all groundings, substitutes.
PARALLEL_GET_LINK <- SATISFYING_LINK // Finds all groundings, puts them to queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be inherited from PATTERN_LINK instead?


// Adjoint to the GetLink. This is "adjoint" in the sense that the roles
// of the pattern and the grounding are reversed: given a grounding, the
Expand Down Expand Up @@ -489,6 +491,7 @@ INTERVAL_LINK <- ORDERED_LINK
// ListLink, to indicate that they are either done, or awaiting
// processing.
ANCHOR_NODE <- NODE
SET_NODE <- NODE

// ====================================================================
// Relations. A "relation" is a subset of a Cartesian product, or more
Expand Down
48 changes: 37 additions & 11 deletions opencog/atoms/core/PutLink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "DefineLink.h"
#include "LambdaLink.h"
#include "PutLink.h"
#include <opencog/atomspace/AtomSpace.h>
#include <opencog/atoms/value/QueueValue.h>

using namespace opencog;

Expand Down Expand Up @@ -255,6 +257,16 @@ static inline Handle reddy(PrenexLinkPtr& subs, const HandleSeq& oset)
return subs->beta_reduce(oset);
}

static inline void reddy(HandleSeq& bset, PrenexLinkPtr& subs, const HandleSeq& oset)
{
try
{
bset.emplace_back(reddy(subs, oset));
}
catch (const TypeCheckException& ex) {}

}

// If arg is executable, then run it, and unwrap the set link, too.
// We unwrap the SetLinks cause that is what GetLinks return.
static inline Handle expand(const Handle& arg, bool silent)
Expand Down Expand Up @@ -431,24 +443,38 @@ Handle PutLink::do_reduce(void) const
// If there is only one variable in the PutLink body...
if (1 == nvars)
{
if (SET_LINK != vtype)

if (SET_LINK == vtype)
{
return reddy(subs, {args});
// If the arguments are given in a set, then iterate over the set...
HandleSeq bset;
for (const Handle& h : args->getOutgoingSet())
{
reddy(bset, subs, {h});
}
return createLink(bset, SET_LINK);
}

// If the arguments are given in a set, then iterate over the set...
HandleSeq bset;
for (const Handle& h : args->getOutgoingSet())
if (SET_NODE == vtype)
{
HandleSeq oset;
oset.emplace_back(h);
try
// If the argument is SetNode, then process atoms from queue
// stored in SetNode value
AtomSpace* as = getAtomSpace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is atomspace required here? It is not used in code below

ValuePtr value = args->getValue(QueueValue::QUEUE_VALUE_KEY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to DEFAULT_QUEUE_VALUE_KEY. We should also probably do the same thing for DEFAULT_TRUTH_VALUE_KEY also (in some other pull req).

if (as && value)
{
bset.emplace_back(reddy(subs, oset));
Handle h;
HandleSeq bset;
HandleClosableQueuePtr queue = QueueValueCast(value)->get_queue();
while (queue->pop(h))
{
reddy(bset, subs, {h});
}
return createLink(bset, SET_LINK);
}
catch (const TypeCheckException& ex) {}
}
return createLink(bset, SET_LINK);

return reddy(subs, {args});
}

// If we are here, then there are multiple variables in the body.
Expand Down
15 changes: 15 additions & 0 deletions opencog/atoms/core/ScopeLink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ void ScopeLink::extract_variables(const HandleSeq& oset)
if (UNQUOTE_LINK == decls)
return;

// oset:
// variables
// body
// target
if (oset.size() == 3)
{
_vardecl = oset[0];
_body = oset[1];
_target = oset[2];

// Initialize _varlist with the scoped variables
init_scoped_variables(_vardecl);
return;
}

// If the first atom is not explicitly a variable declaration, then
// there are no variable declarations. There are two cases that can
// apply here: either the body is a lambda, in which case, we copy
Expand Down
2 changes: 2 additions & 0 deletions opencog/atoms/core/ScopeLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class ScopeLink : public Link
/// Handle of the body of the expression.
Handle _body;

Handle _target;

/// Variables bound in the body.
Variables _varlist;

Expand Down
2 changes: 2 additions & 0 deletions opencog/atoms/pattern/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ ADD_LIBRARY (lambda
BindLink.cc
DualLink.cc
GetLink.cc
ParallelGetLink.cc
PatternLink.cc
PatternTerm.cc
PatternUtils.cc
Expand All @@ -32,6 +33,7 @@ INSTALL (FILES
BindLink.h
DualLink.h
GetLink.h
ParallelGetLink.h
PatternLink.h
Pattern.h
PatternTerm.h
Expand Down
94 changes: 94 additions & 0 deletions opencog/atoms/pattern/ParallelGetLink.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* ParallelGetLink.cc
*
* Copyright (C) 2019 Linas Vepstas
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License v3 as
* published by the Free Software Foundation and including the
* exceptions
* at http://opencog.org/wiki/Licenses
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public
* License
* along with this program; if not, write to:
* Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

#include <opencog/atoms/atom_types/NameServer.h>
#include <opencog/atoms/core/UnorderedLink.h>
#include <opencog/query/Satisfier.h>

#include "ParallelGetLink.h"

using namespace opencog;

void ParallelGetLink::init(void)
{
Type t = get_type();
if (not nameserver().isA(t, PARALLEL_GET_LINK))
{
const std::string& tname = nameserver().getTypeName(t);
throw InvalidParamException(TRACE_INFO,
"Expecting a ParallelGetLink, got %s", tname.c_str());
}
}

ParallelGetLink::ParallelGetLink(const HandleSeq& hseq, Type t)
: PatternLink(hseq, t)
{
init();
}

ParallelGetLink::ParallelGetLink(const Link &l)
: PatternLink(l)
{
init();
}

/* ================================================================= */

HandleSet ParallelGetLink::do_execute(AtomSpace* as, bool silent)
{
if (nullptr == as) as = _atom_space;

ParallelSatisfier sater(as, _target);
this->satisfy(sater);

return sater._satisfying_set;
}

ValuePtr ParallelGetLink::execute(AtomSpace* as, bool silent)
{


#define PLACE_RESULTS_IN_ATOMSPACE
#ifdef PLACE_RESULTS_IN_ATOMSPACE
// Shoot. XXX FIXME. Most of the unit tests require that the atom
// that we return is in the atomspace. But it would be nice if we
// could defer this indefinitely, until its really needed.

HandleSet handle_set = do_execute(as, silent);
for (auto h: handle_set)
{

HandleSeq handle_seq;
handle_seq.push_back(h);
handle_seq.push_back(_target);
Handle member_link = createLink(handle_seq, MEMBER_LINK);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't add results to the atomspace in the ParallelSatisfier instead? Am I right that current implementation collects results as SetLink and then puts them into atomspace as MemberLink? It would be simpler to put them as MemberLink without using a Set as intermediate state.

if (as) member_link = as->add_atom(member_link);
}
#endif /* PLACE_RESULTS_IN_ATOMSPACE */

return _target;
}

DEFINE_LINK_FACTORY(ParallelGetLink, PARALLEL_GET_LINK)

/* ===================== END OF FILE ===================== */
59 changes: 59 additions & 0 deletions opencog/atoms/pattern/ParallelGetLink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* opencog/atoms/pattern/ParallelGetLink.h
*
* Copyright (C) 2019 Linas Vepstas
* All Rights Reserved
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License v3 as
* published by the Free Software Foundation and including the exceptions
* at http://opencog.org/wiki/Licenses
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program; if not, write to:
* Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef _OPENCOG_PARALLEL_GET_LINK_H
#define _OPENCOG_PARALLEL_GET_LINK_H

#include <opencog/atoms/pattern/PatternLink.h>

namespace opencog
{
/** \addtogroup grp_atomspace
* @{
*/
class ParallelGetLink : public PatternLink
{
protected:
void init(void);
virtual HandleSet do_execute(AtomSpace*, bool silent);

public:
ParallelGetLink(const HandleSeq&, Type=PARALLEL_GET_LINK);
explicit ParallelGetLink(const Link &l);

virtual bool is_executable() const { return true; }
virtual ValuePtr execute(AtomSpace*, bool silent=false);

static Handle factory(const Handle&);
};

typedef std::shared_ptr<ParallelGetLink> ParallelGetLinkPtr;
static inline ParallelGetLinkPtr ParallelGetLinkCast(const Handle& h)
{ AtomPtr a(h); return std::dynamic_pointer_cast<ParallelGetLink>(a); }
static inline ParallelGetLinkPtr ParallelGetLinkCast(AtomPtr a)
{ return std::dynamic_pointer_cast<ParallelGetLink>(a); }

#define createParallelGetLink std::make_shared<ParallelGetLink>

/** @}*/
}

#endif // _OPENCOG_PARALLEL_GET_LINK_H
4 changes: 3 additions & 1 deletion opencog/atoms/pattern/PatternLink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ void PatternLink::init(void)
// not set.
if (nullptr == _body) return;

if (2 < _outgoing.size() or
// TODO: Update size check conditions
if (3 < _outgoing.size() or
(1 == _outgoing.size() and _outgoing[0] != _body) or
(2 == _outgoing.size() and _outgoing[1] != _body))
{
throw InvalidParamException(TRACE_INFO,
Expand Down
2 changes: 2 additions & 0 deletions opencog/atoms/value/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ADD_LIBRARY (value
RandomStream.cc
StreamValue.cc
StringValue.cc
QueueValue.cc
ValueFactory.cc
)

Expand All @@ -29,6 +30,7 @@ INSTALL (FILES
RandomStream.h
StreamValue.h
StringValue.h
QueueValue.h
ValueFactory.h
DESTINATION "include/opencog/atoms/value"
)
47 changes: 47 additions & 0 deletions opencog/atoms/value/QueueValue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* opencog/atoms/value/QueueValue.cc
*
* Copyright (C) 2015, 2016 Linas Vepstas
* All Rights Reserved
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License v3 as
* published by the Free Software Foundation and including the exceptions
* at http://opencog.org/wiki/Licenses
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program; if not, write to:
* Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

#include <opencog/atoms/value/QueueValue.h>
#include <opencog/atoms/value/ValueFactory.h>

using namespace opencog;

const Handle QueueValue::QUEUE_VALUE_KEY = createNode(CONCEPT_NODE, "QUEUE_VALUE_KEY");

bool QueueValue::operator==(const Value& other) const
{
if (QUEUE_VALUE != other.get_type()) return false;

const QueueValue* cv = (const QueueValue*) &other;
return _name == cv->_name;
}

std::string QueueValue::to_string(const std::string& indent) const
{
std::string rv = indent + "(" + nameserver().getTypeName(_type);
rv += std::string(" \"") + _name + "\")";
return rv;
}

// Adds factory when library is loaded.
DEFINE_VALUE_FACTORY(QUEUE_VALUE,
createQueueValue, std::string)
Loading