Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python API for QueueValue is not implemented #2743

Closed
vsbogd opened this issue Aug 4, 2020 · 17 comments · Fixed by #2749
Closed

Python API for QueueValue is not implemented #2743

vsbogd opened this issue Aug 4, 2020 · 17 comments · Fixed by #2749

Comments

@vsbogd
Copy link
Contributor

vsbogd commented Aug 4, 2020

The following code returns an error:

from opencog.type_constructors import *
from opencog.execute import execute_atom
from opencog.utilities import initialize_opencog, finalize_opencog

atomspace = AtomSpace()
initialize_opencog(atomspace)

query_link = MeetLink(VariableNode("$test"), MemberLink(VariableNode("$test"), ConceptNode("Constant")))

result2 = execute_atom(atomspace, query_link)

the output:

Traceback (most recent call last):
  File "./test.py", line 10, in <module>
    result2 = execute_atom(atomspace, query_link)
  File "execute.pyx", line 17, in opencog.execute.execute_atom
  File "nameserver.pyx", line 90, in opencog.atomspace.create_python_value_from_c_value
TypeError: Python API for QueueValue is not implemented yet
@linas
Copy link
Member

linas commented Aug 4, 2020

QueueValue specializes the the LinkValue and I think this means the fix could be just one line of code -- find the line in the cython code (some pyx file) that says if t == LinkValue: and replace it with if nameserver.isA(t, LinkValue): and everything should work ...

@ntoxeg
Copy link
Contributor

ntoxeg commented Aug 7, 2020

Okay, I have to withdraw my previous (deleted) comment, unfortunately (I had a typo in the code) - while it is possible to create Values in a unified way in create_python_value_from_c_value, add_link expects that outgoing sets of Links should only have Atoms in them - is that check now irrelevant and should be made into a Value check or are there special cases here (like if there are both Atoms and non-Atom Values inside)?

@ntoxeg
Copy link
Contributor

ntoxeg commented Aug 7, 2020

Actually while we're at it: what is a LinkValue? Isn't it supposed to be ListValue? This type's name sounds like an oxymoron - is it a Link (more than a Value) or just a Value?

Edit: okay, skimming through the code it seems the "LinkValue" is like Link<Value> - which makes more sense when looked at this way (still will be confusing for new people).

@ntoxeg
Copy link
Contributor

ntoxeg commented Aug 7, 2020

Okay, the example went back to returning (QueueValue) after fixing. Is that correct behaviour?

@vsbogd
Copy link
Contributor Author

vsbogd commented Aug 7, 2020

@ntoxeg , as @linas said it should be enough to add:

    # For handling the children types of LinkValue.
    if is_a(value_type, types.LinkValue):
        return LinkValue(ptr_holder = ptr_holder)

before

# For handling the children types of TruthValue.
if is_a(value_type, types.TruthValue):
return TruthValue(ptr_holder = ptr_holder)
# For handling the children types of Atom.
if is_a(value_type, types.Atom):
return Atom(ptr_holder = ptr_holder)

But I don't see why you are talking about add_link because it should not be called for a Value. Is it another issue not directly related to QueueLink?

@vsbogd
Copy link
Contributor Author

vsbogd commented Aug 7, 2020

Okay, the example went back to returning (QueueValue) after fixing. Is that correct behaviour?

I think so.

@linas
Copy link
Member

linas commented Aug 7, 2020

Okay, the example went back to returning (QueueValue) after fixing. Is that correct behaviour?

I think so.

It's correct if the query is meant to return no results.

QueueValue is a thread-safe FIFO -- multiple writers in multiple threads can write to it, multiple readers in multiple threads can read from it, and it will stay consistent. It is used to accumulate search results, as they are found.

A LinkValue is just a holder of multiple values. It's not thread-safe.

Streams are values that change over time -- so a LinkStream is a LinkValue whose contents may or may not change over time. Whether or not its thread safe, or how it might work, depends on what kind of stream it is. i.e. its implementation-dependent on the particular stream.

Besides queues, other examples of streams might be video and audio ... does not mean that we actually store video and audio in the atomspace; merely that they can be sampled whenever needed, or otherwise operated on, e.g. with PlusLink to e.g. combine two audio streams, etc.

@linas
Copy link
Member

linas commented Aug 7, 2020

and .. ohh... BTW, @vsbogd I should mention -- since we're on the topic -- not only can you apply a PlusLink to two audio streams (if we had audio streams, which we don't) one should be able to do the same for neural net values or tensorflow, etc.

So if you had (for example) some NeuralNetStream, subclasse from FloatStream, then it could (for example) hold some pointer to something cranking along on a GPU somewhere. If you wanted to know what the current weight-vector on that GPU was "right now", you could just sample from that FloatStream and get those values.

Or maybe instead, if you wanted to specify some particular set of processing steps (like what tensorflow does) you could say, for example, "I want to combine net A B and C", you could use PlusLink for that. Or maybe some other CombinerLink or whatever. The actual processing still happens on the GPU's, and almost no data at all has to move between atomspace and gpu; all that the links and values do is to specify how things are connected together.

There's a working example of this, using RandomValue as a stand-in for something more complex (e.g. video, audio, neural nets, gpu streams, etc.) and its combined with PlusLink and TimesLink and GreaterThanLink and "it just works". It's in some examples directory, somewhere...

@vsbogd
Copy link
Contributor Author

vsbogd commented Aug 8, 2020

QueueValue is a thread-safe FIFO -- multiple writers in multiple threads can write to it, multiple readers in multiple threads can read from it, and it will stay consistent. It is used to accumulate search results, as they are found.

It would be convenient if LinkStreamValue provide a stream specific interface. At the moment it only can return list of values as LinkValue does and it is not possible to wait for next stream value.

There's a working example of this, using RandomValue as a stand-in for something more complex (e.g. video, audio, neural nets, gpu streams, etc.) and its combined with PlusLink and TimesLink and GreaterThanLink and "it just works". It's in some examples directory, somewhere...

You probably mean stream.scm example?

@vsbogd
Copy link
Contributor Author

vsbogd commented Aug 8, 2020

I would also mention @noskill here.

@linas
Copy link
Member

linas commented Aug 8, 2020

It would be convenient if

Propose one. As a general rule, I try to under-design/under-specify brand-new interfaces and brand-new ideas. That way, I can avoid adding bad/broken/poorly designed API's in the early stages. The correct/best API "materializes out of thin air" with the first 2 or 3 "practical applications" when it becomes clear how it should have worked. The down-side of under-design is that handy utilities are missing in the early stages. They have to be added...

LinkStreamValue provide a stream specific interface

The "stream" here would be a stream of LinkValues. If you just want a stream of any kind of values, the base class would be StreamValue. (The current StreamValue needs to be renamed to FloatStream so that's a bug.)

it is not possible to wait for next stream value.

But QueueValue does allow you to wait. That is why it exists! That's the "whole point of it". Note, however, that waiting, in general, is bad .. if you have a video stream, you typically don't want to wait, as that will just block and you'll collapse under the bandwidth. So all of the values *not just the stream values) were design to "sample" current values. They have an update() which is meant to set "the current value".

@linas
Copy link
Member

linas commented Aug 8, 2020

The "stream" here would be a stream of LinkValues.

err... or not. Some confusion here. Need to clarify this, per #2750

@vsbogd
Copy link
Contributor Author

vsbogd commented Aug 10, 2020

But QueueValue does allow you to wait. That is why it exists! That's the "whole point of it". Note, however, that waiting, in general, is bad ..

Saying "wait" I mean be notified when next value appears in the stream. It is not necessary blocking but it allows user of the QueueValue class to get next value at the moment it is arrived. I see update does something similar. I didn't get it from the beginning, because name of the method is unusual. But update is also different because my understanding it waits until all of values arrived. So it doesn't allow streaming, it collects all values at once.

Nevertheless one could use it to rewrite example above as something like:

query_link = MeetLink(VariableNode("$test"), MemberLink(VariableNode("$test"), ConceptNode("Constant")))
result2 = execute_atom(atomspace, query_link)
result2.update()
print("results:", result2.to_list())

@vsbogd
Copy link
Contributor Author

vsbogd commented Aug 10, 2020

The "stream" here would be a stream of LinkValues.

I see in code that QueueValue is a queue of ValuePtr so not sure why do you say it is a stream of LinkValue?

@linas
Copy link
Member

linas commented Aug 10, 2020

update ...

The update() method is protected and virtual, meant to be overloaded. (so apps cannot call it directly). You can make update() do anything at all... no rules there. However, the general idea was that when an app did something like print("results:", result2.to_list()), that the to_list() method would call update() under the covers to get the latest values. The idea was that there would be a single common to_list() that everyone shares, and only the update() part would need to be different for each kind of Value.

notify ...

There are three styles of notification: blocking until some action is done, polling in a loop
"are you done yet?" and callbacks -- installing some snippet of code that is called when the action is done. So QueueValue implements blocking...

The current QueueValue is a combo of several parts: a thread-safe FIFO plus some quick hacks. The thread-safe FIFO does what you want. The quick hack then hides it in backwards-compatible junk. So, the old, and still current API to result2 = execute_atom(atomspace, query_link) is that it behaves as if it were single-threaded. Thus, execute_atom never returns (aka "blocks") until it is completely done. Under the covers, this "secretly" uses threads and FIFO's ... but hides that from the user.

I did it this way because I wanted to have the multi-threaded capabilities in place and ready to be used, but didn't want to break backwards-compat. The problem is that almost no one cares, so I wasn't going to go through contortions and complete exhaustion to carefully design something that no one cares about and no one will use. So the fancy threads and queues are hidden, for now.

However, if you are ready to use this, then the next step would be to write a multi-threaded demo, showing how to start the execute_atom in one thread, and how to wait for and process the results in another .... part of this demo would be to remove/fix/change whatever hacks I used to make it backwards-compat (it still has to stay backwards compat, so we need to invent something that keeps that, while also exposing the new function... this was the work I was unwilling to do... )

BTW, cogutils has three things: a concurrent FIFO, a LIFO (last-in, first-out aka a "stack") and a de-duplicating set (arbitrary order; duplicates are auto-removed/ignored). The all have two modes: "open", where you can add/remove items, and "closed", where no more items can be added, indicating that the work is "done". Because QueueValue inherits from concurrent_queue all of the methods on concurrent_queue are also on QueueValue so you can call any of them (that are public). Look at cogutils-git/opencog/util/concurrent_queue.h

FYI, there is also async_buffer.h and async_method_caller.h -- the async-buffer also adds hi/low watermarks to keep the buffer from getting too full; it also automatically manages the readers -- the constructor takes a C++ callback and number of threads, and the callback will be called whenever there's data. The async_method_caller.h is exactly the same, except it uses a queue instead of a set (so the buffered messages are kept in sequential order) -- The Logger uses this for logging. The SQL backend uses it for collecting up atoms to be written out to the sql server. The cog-client uses this to open 4 tcp/ip sockets to the cogserver, and send atoms in parallel (and then wait/block until the replies arrive). ... so this stuff is heavily used in multi-threaded ways .. just, not yet for execute_atom().

@vsbogd
Copy link
Contributor Author

vsbogd commented Aug 11, 2020

I see that update is already used in LinkValue::value so the following code blocks and returns all results:

query_link = MeetLink(VariableNode("$test"), MemberLink(VariableNode("$test"), ConceptNode("Constant")))
result2 = execute_atom(atomspace, query_link)
print("results:", result2.to_list())

@linas
Copy link
Member

linas commented Aug 11, 2020

update is already used

Yes. Values are still sort-of experimental-ish. I'm not entirely happy with having std:vector in them by default, and having update() but couldn't think of anything better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants