Skip to content

Commit

Permalink
Now using AbstractJSONable.
Browse files Browse the repository at this point in the history
  • Loading branch information
halfak committed Aug 5, 2014
1 parent d85e6df commit d3f1e05
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 74 deletions.
102 changes: 50 additions & 52 deletions mwevents/sources/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
class RCListener:

def __init__(self, session, *, state_marker, events,
min_wait, rcs_per_request, stop):
max_wait, rcs_per_request, stop):
self.session = session

self.state_marker = state_marker
self.events = events
self.min_wait = min_wait
self.max_wait = max_wait
self.rcs_per_request = rcs_per_request
self.stop = stop
self.kwargs = {
Expand Down Expand Up @@ -57,7 +57,7 @@ def __iter__(self):


if len(rc_docs) < self.rcs_per_request:
time.sleep(min_wait - (time.time() - start))
time.sleep(self.max_wait - (time.time() - start))


class API:
Expand All @@ -71,61 +71,59 @@ class API:
def __init__(self, session):
self.session = session

def listener(self, state_marker=None, events=None, min_wait=5,
def listener(self, state_marker=None, events=None, max_wait=5,
rcs_per_request=100, direction="newer",
properties=RC_EVENT_PROPS, stop=lambda: False):
"""
:Example:
.. code-block:: python
import sys
from mwevents.sources import API
from mwevents import RevisionSaved, PageCreated
API_URL = "http://en.wikipedia.org/w/api.php"
try:
api_source = API.from_api_url(API_URL)
listener = api_source.listener(events={RevisionSaved,
PageCreated})
for event in listener:
if isinstance(event, RevisionSaved):
print("Revision {0} of {1} saved by {2}."\
.format(event.revision.id,
event.revision.page_id,
event.user))
else: # isinstance(event, PageCreated):
print("Page {0}:{1} created by {2}."\
.format(event.page.namespace,
event.page.title,
event.user))
except KeyboardInterrupt:
sys.stderr.write("Keyboard Interrupt caught. " + \
"Shutting down.\n")
sys.stderr.write(str(listener.state_marker.to_json()) + "\n")
"""
state_marker = StateMarker(state_marker) \
if state_marker is not None else StateMarker()

events = set(events) if events is not None else None

min_wait = float(min_wait)
rcs_per_request = int(rcs_per_request)
"""
:Example:
if not callable(stop):
raise TypeError("'stop' must be a callable function")
.. code-block:: python
return RCListener(self.session,
state_marker=state_marker,
events=events,
min_wait=min_wait,
rcs_per_request=rcs_per_request,
stop=stop)
from mwevents.sources import API
from mwevents import RevisionSaved, PageCreated
api_source = \
API.from_api_url("http://en.wikipedia.org/w/api.php")
listener = \
api_source.listener(events={RevisionSaved, PageCreated})
for event in listener:
if isinstance(event, RevisionSaved):
print(event.revision)
else: # isinstance(event, PageCreated):
print(event.page)
"""
state_marker = StateMarker(state_marker) \
if state_marker is not None \
else self._get_current_state()

events = set(events) if events is not None else None

max_wait = float(max_wait)
rcs_per_request = int(rcs_per_request)

if not callable(stop):
raise TypeError("'stop' must be a callable function")

return RCListener(self.session,
state_marker=state_marker,
events=events,
max_wait=max_wait,
rcs_per_request=rcs_per_request,
stop=stop)

def _get_current_state(self):
docs = list(self.session.recent_changes.query(properties={'ids',
'timestamp'},
limit=1))

if len(docs) > 0:
return StateMarker(Timestamp(docs[0]['timestamp']), docs[0]['rcid'])
else:
return StateMarker()

def query(self, *args, **kwargs): raise NotImplemented Error
def query(self, *args, **kwargs): raise NotImplementedError()

@classmethod
def from_api_url(cls, url):
Expand Down
157 changes: 157 additions & 0 deletions mwevents/sources/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@


def ArchiveRevisionLoggingListener:

def __init__(self, db, *, state_marker, events,
max_wait, records_per_request, stop):

self.db = db,
self.state_marker = state_marker
self.events = events
self.max_wait = max_wait
self.records_per_request = records_per_request
self.stop = stop

def __iter__(self):

archives = self._read_archives()
revisions = self._read_revisions()
loggings = self._read_logging()

for match, doc in

def _read_archives(self):

last_rev_id = None
last_timestamp = None

while not self.stop():

if last_rev_id is not None or \
self.state_marker.last_rev_id is not None:
archive_rows = self.db.archives.query(
after_id=last_rev_id or self.state_marker.last_rev_id,
direction="newer",
limit=self.records_after_request)

elif self.state_marker.last_timestamp is not None:
archive_rows = self.db.archive.query(
after=self.state_marker.last_event,
direction="newer",
limit=self.records_after_request)

else:
archive_rows = self.db.archive.query(
after_id=0,
direction="newer",
limit=self.records_after_request)


for archive_row in archive_rows:
yield Match.from_rev_row(archive_row), archive_row

if len(archive_rows) > 0

last_rev_id = archive_rows[-1]['rev_id'] or last_rev_id

def _read_revisions(self):

last_rev_id = None
last_timestamp = None

while not self.stop():

# TODO: Document assumption of rev_id order.
if last_rev_id is not None or \
self.state_marker.last_rev_id is not None:
rev_rows = self.db.revisions.query(
after_id=last_rev_id or self.state_marker.last_rev_id,
direction="newer",
limit=self.records_after_request)

elif self.state_marker.last_timestamp is not None:
rev_rows = self.db.revisions.query(
after=self.state_marker.last_event,
direction="newer",
limit=self.records_after_request)

else:
rev_rows = self.db.revisions.query(
after_id=0,
direction="newer",
limit=self.records_after_request)


for rev_row in rev_rows:
yield Match.from_rev_row(rev_row), rev_rows

if len(rev_rows) > 0
last_rev_id = rev_rows[-1]['rev_id'] or last_rev_id

def _read_logging(self):

last_log_id = None

while not self.stop():

# TODO: Document assumption of log_id order.
if last_log_id is not None or \
self.state_marker.last_log_id is not None:
log_rows = self.db.logging.query(
after_id=last_log_id or self.state_marker.last_log_id,
direction="newer",
limit=self.records_after_request)

elif self.state_marker.last_timestamp is not None:
log_rows = self.db.logging.query(
after=self.state_marker.last_event,
direction="newer",
limit=self.records_after_request)

else:
log_rows = self.db.logging.query(
after_id=0,
direction="newer",
limit=self.records_after_request)


for log_row in log_rows:
yield Match.from_log_row(log_row), log_row

if len(log_rows) > 0
last_rev_id = log_row[-1]['log_id'] or last_log_id



def Database:

def __init__(self, db):
self.db = db

def listener(self, state_marker=None,
events=None,
max_wait=5,
records_per_request=100,
stop=lambda: False)

state_marker = StateMarker(state_marker) if state_marker is not None \
else self._get_current_state()

events = set(events) if events is not None else None

max_wait = float(max_wait)
records_per_request = int(records_per_request)

if not callable(stop):
raise TypeError("'stop' must be a callable function")

return ArchiveRevisionLoggingListener(self.session,
state_marker=state_marker,
events=events,
max_wait=max_wait,
records_per_request=records_per_request,
stop=stop)

@class_method
def from_params(self, *args, **kwargs):
return Database(DB.from_params(*args, **kwargs))
6 changes: 5 additions & 1 deletion mwevents/sources/source.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@

class RCListener:

def __iter__(self): raise NotImplementedError()


class Source:

def listen(self, *args, **kwargs):
def listener(self, *args, **kwargs):
raise NotImplementedError()

def query(self, start, end, *args, types=None, **kwargs):
Expand Down
24 changes: 4 additions & 20 deletions mwevents/types/events/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,32 @@

from mw import Timestamp

from jsonable import JSONable
from jsonable import AbstractJSONable, JSONable

from .. import User
from ... import configuration
from .match import Match


class Event(JSONable):
class Event(AbstractJSONable):
__slots__ = ('timestamp', 'user', 'comment')
MATCHES = NotImplemented
EVENTS = {}
MATCH_GROUPS = defaultdict(lambda: [])
PRIORITY = 99
CLASS_NAME_KEY = "event"

def initialize(self, timestamp, user, comment):
self.timestamp = Timestamp(timestamp)
self.user = User(user)
self.comment = str(comment)

def to_json(self):
doc = super().to_json()
doc['event'] = self.__class__.__name__
return doc


@classmethod
def from_json(cls, doc):
if 'event' in doc:
EventClass = cls.EVENTS.get(doc['event'], cls)
new_doc = copy.copy(doc)
del new_doc['event']
return EventClass.from_json(new_doc)
else:
return cls._from_json(doc)

@classmethod
def register(cls, EventClass):
for match in EventClass.MATCHES:
cls.MATCH_GROUPS[match].append(EventClass)
cls.MATCH_GROUPS[match].sort(key=lambda e:e.PRIORITY)

cls.EVENTS[EventClass.__name__] = EventClass
super().register(EventClass)

@classmethod
def matches(cls, match):
Expand Down
2 changes: 1 addition & 1 deletion mwevents/types/events/tests/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_construction_and_values():
eq_(event.comment, comment)

eq_(event, Event(event))
eq_(event, Event(event.to_json()))
#eq_(event, Event(event.to_json()))

def test_json_of_subclasses():
user = User(10, "Foo")
Expand Down

0 comments on commit d3f1e05

Please sign in to comment.