-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgopherfeed.py
162 lines (143 loc) · 6.09 KB
/
gopherfeed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Copyright (c) 2013, Luke Maurits <[email protected]>
# Published under the terms of the BSD 3-Clause License
# (see LICENSE file or http://opensource.org/licenses/BSD-3-Clause)
__version__ = 1.2
import feedparser
import codecs
import os
import socket
import time
feedparser.USER_AGENT = "Gopherfeed +https://github.com/lmaurits/gopherfeed"
_TIME_FORMAT = "%Y-%m-%d %H:%M"
def _build_mapline(entry, timestamp=False, feed_object=None):
"""Return one line of a Gophermap, built from one feed entry bject."""
filetype = "h"
descr = entry.title.replace("\t"," ")
if feed_object:
descr = "%s: %s" % (feed_object.get("title", "Untitled feed"), descr)
if timestamp:
if "published_parsed" in entry:
epoch = time.mktime(entry.published_parsed)
elif "updated_parsed" in entry:
epoch = time.mktime(entry.updated_parsed)
timestamp = time.localtime(epoch)
timestring = time.strftime(_TIME_FORMAT, timestamp)
descr = "[%s] %s" % (timestring, descr)
mapline = "%s%s\tURL:%s" % (filetype, descr, entry.link)
return mapline
def gopherize_feed_object(feed_obj, timestamp=False, plug=True):
"""Return a gophermap string for a feed object produced by feedparser."""
feed, entries = feed_obj.feed, feed_obj.entries
if not entries:
raise Exception("Problem either fetching or parsing feed")
maplines = []
feed_title = feed.get("title", feed.get("link", "Untitled feed"))
feed_title = feed_title.replace("\t"," ")
maplines.append(feed_title)
if feed.get("description", None):
maplines.append(feed.description.replace("\t"," "))
maplines.append("")
timestamped_maplines = []
for entry in entries:
mapline = _build_mapline(entry, timestamp)
if "published_parsed" in entry:
timestamped_maplines.append((time.mktime(entry.published_parsed), mapline))
elif "updated_parsed" in entry:
timestamped_maplines.append((time.mktime(entry.updated_parsed), mapline))
# Entries are not guaranteed to appear in feed in chronological order,
# so let's sort them
timestamped_maplines.sort()
timestamped_maplines.reverse()
for updated, mapline in timestamped_maplines:
maplines.append(mapline)
if plug:
if feed_obj.version.startswith("rss"):
feed_type = "RSS feed"
elif feed_obj.version.startswith("atom"):
feed_type = "Atom feed"
else:
feed_type = "Unknown feed type"
maplines.append("_"*70)
plug_line = "Converted from %s by Gopherfeed %s" % (feed_type, __version__)
maplines.append(plug_line.rjust(70))
return "\n".join(maplines)
def gopherize_feed(feed_url, timestamp=False, plug=True):
"""Return a gophermap string for the feed at feed_url."""
return gopherize_feed_object(feedparser.parse(feed_url), timestamp, plug)
def _slugify(feed):
"""Make a simple string from feed title, to use as a directory name."""
slug = feed.title
slug = slug.encode("ASCII", "ignore")
for kill in """.,:;-"'\`\/""":
slug = slug.replace(kill,"_")
slug = slug.replace(" ","_")
slug = slug.lower()
return slug
def build_feed_index(feed_objects, directory, header=None, hostname=None,
port=70, sort=None, plug=True):
"""
Build a gophermap file in the specified directory, which presents an index
for all the feeds in feed_objects.
"""
if not hostname:
hostname = socket.getfqdn()
decorated_maplines = []
for index, feed_obj in enumerate(feed_objects):
feed, entries = feed_obj.feed, feed_obj.entries
feed_slug = _slugify(feed)
feed_dir = os.path.join(directory, feed_slug)
feed_title = feed.get("title", feed.get("link", "Untitled feed"))
feed_title = feed_title.replace("\t"," ")
if "published_parsed" in entry:
mre = max([time.mktime(entry.published_parsed) for entry in entries])
elif "updated_parsed" in entry:
mre = max([time.mktime(entry.updated_parsed) for entry in entries])
mapline = "1%s\t%s\t%s\t%d" % (feed_title, feed_dir, hostname, port)
if sort == "alpha":
decorated_maplines.append((feed_title.lower(), mapline))
elif sort == "time":
decorated_maplines.append((mre, mapline))
else:
decorated_maplines.append((index, mapline))
decorated_maplines.sort()
if sort == "time":
decorated_maplines.reverse()
maplines = []
if header:
maplines.append(header)
maplines.append("")
for decoration, mapline in decorated_maplines:
maplines.append(mapline)
if plug:
maplines.append("_"*70)
plug_line = "Converted from RSS/Atom feeds by Gopherfeed %s" % __version__
maplines.append(plug_line.rjust(70))
return "\n".join(maplines)
def combine_feed_objects(feed_objs, max_entries=20, timestamp=False, plug=True):
"""
Build a single gophermap string, combining the entries from all
provided feed objects.
"""
timestamped_maplines = []
for feed_obj in feed_objs:
feed, entries = feed_obj.feed, feed_obj.entries
for entry in entries:
# Oddly enough, you occasionally find feed entries without a link..
if "link" not in entry:
continue
if "published_parsed" in entry:
mapline = _build_mapline(entry, timestamp, feed)
timestamped_maplines.append((time.mktime(entry.published_parsed), mapline))
elif "updated_parsed" in entry:
mapline = _build_mapline(entry, timestamp, feed)
timestamped_maplines.append((time.mktime(entry.updated_parsed), mapline))
timestamped_maplines.sort()
timestamped_maplines.reverse()
maplines = []
for updated, mapline in timestamped_maplines[:max_entries]:
maplines.append(mapline)
if plug:
maplines.append("_"*70)
plug_line = "Converted from RSS/Atom feeds by Gopherfeed %s" % __version__
maplines.append(plug_line.rjust(70))
return "\n".join(maplines)