Skip to content

Adds support for youtube channels #206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 162 additions & 76 deletions castero/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self, url=None, file=None, text=None, **kwargs) -> None:
self._file = file
self._tree = None
self._validated = False
self._is_atom = False

self._title = kwargs.get("title", None)
self._description = kwargs.get("description", None)
Expand Down Expand Up @@ -178,91 +179,145 @@ def _validate_feed(self):
"""
assert self._tree is not None

# root should be an rss tag
if self._tree.tag != "rss":
raise FeedStructureError("XML document is not an RSS feed")

# root should have version attribute which equals 2.0
if "version" in self._tree.attrib:
if self._tree.attrib["version"] != "2.0":
raise FeedStructureError("RSS version is not 2.0")
else:
raise FeedStructureError("RSS feed does not have a version attribute")

# root should a channel tag as its child
# theoretically the root should have only one child, but see the
# exception listed in the method description
root_children = list(self._tree)
if len(root_children) > 0:
channel = None
for root_child in root_children:
if root_child.tag == "channel":
channel = root_child
break
if channel is None:
raise FeedStructureError("RSS feed does not have a channel tag as its child")

# Channel should have at least 3 children, including a
# title and description tag. There should be a "link" tag, but we
# allow an "atom:link" tag as a substitute
channel_children = list(channel)
if len(channel_children) >= 3:
chan_title_tags = channel.findall("title")
chan_link_tags = channel.findall("link")
chan_atomlink_tags = channel.findall("{http://www.w3.org/2005/Atom}link")
chan_description_tags = channel.findall("description")

if len(chan_title_tags) != 1:
#raise FeedStructureError(" tag is " +self._tree.tag )
if "feed" in self._tree.tag:
self._is_atom = True

atom_children = list(self._tree)
#Atom feed should contain at least title, link and author
if len(atom_children) >= 3:
atom_title_tags = self._tree.findall("{http://www.w3.org/2005/Atom}title")
atom_link_tags = self._tree.findall("{http://www.w3.org/2005/Atom}link")
atom_author_tags = self._tree.findall("{http://www.w3.org/2005/Atom}author")
if len(atom_title_tags) != 1:
raise FeedStructureError(
"RSS feed's channel has too many or too few"
" title tags; expected 1, was: " + str(len(chan_title_tags))
"Atom feed's channel has too many or too few"
" title tags; expected 1, was: " + str(len(atom_title_tags))
)
else:
if channel.find("title").text is None:
raise FeedStructureError("RSS feed's channel has no title text")
if len(chan_link_tags) > 1:
if self._tree.find("{http://www.w3.org/2005/Atom}title").text is None:
raise FeedStructureError("Atom feed's channel has no title text")
if len(atom_link_tags) <= 0:
raise FeedStructureError(
"RSS feed's channel has too many"
" link tags; expected 1, was: "
+ str(len(chan_link_tags))
"Atom feed's channel has too few"
" link tags; expected >0, was: "
+ str(len(atom_link_tags))
+ ". The corresponding title is: "
+ str(chan_title_tags[0].text)
+ str(atom_title_tags[0].text)
)
if len(chan_link_tags) == 0:
if len(chan_atomlink_tags) == 0:
raise FeedStructureError(
"RSS feed's channel had 0 link tags, expected 1."
+ " There were also no atom:link tags available to"
+ " use as a substitute"
+ ". The corresponding title is: "
+ str(chan_title_tags[0].text)
)
if len(chan_description_tags) != 1:
if len(atom_author_tags) != 1:
raise FeedStructureError(
"RSS feed's channel has too many or too few"
" description tags; expected 1, was: "
+ str(len(chan_description_tags))
"Atom feed's channel has too many or too few"
" author tags; expected 1, was: "
+ str(len(atom_author_tags))
+ ". The corresponding title is: "
+ str(chan_title_tags[0].text)
+ str(atom_title_tags[0].text)
)

# if the channel has any items, each item should have
# at least a title or description tag
channel_item_tags = channel.findall("item")
for item in channel_item_tags:
if len(item.findall("title") + item.findall("description")) < 1:
# if the feed has any entries, each entry should have
# at least a title
atom_entry_tags = self._tree.findall("entry")
for entry in atom_entry_tags:
if len(entry.findall("title")) < 1:
raise FeedStructureError(
"An item in the RSS feed's channel did not"
"An entry in the Atom feed's channel did not"
" have at least one of a title or a"
" description tag"
)


else:
raise FeedStructureError(
"RSS feed's channel does not have enough required"
" children; expected >=3, was: " + str(len(channel_children))
" children; expected >=3, was: " + str(len(atom_children))
)
else:
raise FeedStructureError("RSS feed does not have any children; expected 1 (a channel" " tag)")

if self._is_atom == False:
# root should be an rss tag
if self._tree.tag != "rss" :
raise FeedStructureError("XML document is not an RSS feed")

# root should have version attribute which equals 2.0
if "version" in self._tree.attrib:
if self._tree.attrib["version"] != "2.0":
raise FeedStructureError("RSS version is not 2.0")
else:
raise FeedStructureError("RSS feed does not have a version attribute")

# root should have a channel tag as its child
# theoretically the root should have only one child, but see the
# exception listed in the method description
root_children = list(self._tree)
if len(root_children) > 0:
channel = None
for root_child in root_children:
if root_child.tag == "channel":
channel = root_child
break
if channel is None:
raise FeedStructureError("RSS feed does not have a channel tag as its child")

# Channel should have at least 3 children, including a
# title and description tag. There should be a "link" tag, but we
# allow an "atom:link" tag as a substitute
channel_children = list(channel)
if len(channel_children) >= 3:
chan_title_tags = channel.findall("title")
chan_link_tags = channel.findall("link")
chan_atomlink_tags = channel.findall("{http://www.w3.org/2005/Atom}link")
chan_description_tags = channel.findall("description")

if len(chan_title_tags) != 1:
raise FeedStructureError(
"RSS feed's channel has too many or too few"
" title tags; expected 1, was: " + str(len(chan_title_tags))
)
else:
if channel.find("title").text is None:
raise FeedStructureError("RSS feed's channel has no title text")
if len(chan_link_tags) > 1:
raise FeedStructureError(
"RSS feed's channel has too many"
" link tags; expected 1, was: "
+ str(len(chan_link_tags))
+ ". The corresponding title is: "
+ str(chan_title_tags[0].text)
)
if len(chan_link_tags) == 0:
if len(chan_atomlink_tags) == 0:
raise FeedStructureError(
"RSS feed's channel had 0 link tags, expected 1."
+ " There were also no atom:link tags available to"
+ " use as a substitute"
+ ". The corresponding title is: "
+ str(chan_title_tags[0].text)
)
if len(chan_description_tags) != 1:
raise FeedStructureError(
"RSS feed's channel has too many or too few"
" description tags; expected 1, was: "
+ str(len(chan_description_tags))
+ ". The corresponding title is: "
+ str(chan_title_tags[0].text)
)

# if the channel has any items, each item should have
# at least a title or description tag
channel_item_tags = channel.findall("item")
for item in channel_item_tags:
if len(item.findall("title") + item.findall("description")) < 1:
raise FeedStructureError(
"An item in the RSS feed's channel did not"
" have at least one of a title or a"
" description tag"
)
else:
raise FeedStructureError(
"RSS feed's channel does not have enough required"
" children; expected >=3, was: " + str(len(channel_children))
)
else:
raise FeedStructureError("RSS feed does not have any children; expected 1 (a channel" " tag)")

self._validated = True

Expand All @@ -279,8 +334,14 @@ def _parse_metadata(self):
channel = root_child
break

self._title = channel.find("title").text.strip()
self._description = channel.find("description").text.strip()
if self._is_atom:
#handle atom
channel = self._tree
self._title = channel.find("{http://www.w3.org/2005/Atom}title").text.strip()
self._description = channel.find("{http://www.w3.org/2005/Atom}author").text.strip()
else:
self._title = channel.find("title").text.strip()
self._description = channel.find("description").text.strip()

link_tags = channel.findall("link")
if len(link_tags) > 0:
Expand All @@ -305,20 +366,40 @@ def parse_episodes(self) -> List[Episode]:
:returns List[Episode]: the episodes in this feed, which need to be added to
the database
"""

channel = None
for root_child in list(self._tree):
if root_child.tag == "channel":
channel = root_child
break

if self._is_atom:
channel = self._tree
items = channel.findall("{http://www.w3.org/2005/Atom}entry")
else:
items = channel.findall("item")

i = 0
episodes = []
for item in channel.findall("item"):
item_title = item.find("title")
item_description = item.find("description")
item_link = item.find("link")
item_pubdate = item.find("pubDate")
item_copyright = item.find("copyright")
item_enclosure = item.find("enclosure")
for item in items:
if self._is_atom:
item_title = item.find("{http://www.w3.org/2005/Atom}title")
for item_child in list(item):
if item_child.tag == "{http://search.yahoo.com/mrss/}group":
item_media_group = item_child

item_description = item_media_group.find("{http://search.yahoo.com/mrss/}description")
item_link = item.find("{http://www.w3.org/2005/Atom}link")
item_pubdate = item.find("{http://www.w3.org/2005/Atom}published")
item_copyright = item.find("copyright")
item_enclosure = item_link
else:
item_title = item.find("title")
item_description = item.find("description")
item_link = item.find("link")
item_pubdate = item.find("pubDate")
item_copyright = item.find("copyright")
item_enclosure = item.find("enclosure")

item_title_str = None
item_description_str = None
Expand All @@ -327,19 +408,24 @@ def parse_episodes(self) -> List[Episode]:
item_copyright_str = None
item_enclosure_str = None


if item_title is not None and item_title.text is not None:
item_title_str = item_title.text.strip()
if item_description is not None and item_description.text is not None:
item_description_str = item_description.text.strip()
if item_link is not None and item_link.text is not None:
item_link_str = item_link.text.strip()
if item_link is not None and self._is_atom:
item_link_str = item_link.attrib["href"]
if item_pubdate is not None and item_pubdate.text is not None:
item_pubdate_str = item_pubdate.text.strip()
if item_copyright is not None and item_copyright.text is not None:
item_copyright_str = item_copyright.text.strip()
if item_enclosure is not None:
if "url" in item_enclosure.attrib.keys():
item_enclosure_str = item_enclosure.attrib["url"]
if "href" in item_enclosure.attrib.keys():
item_enclosure_str = item_enclosure.attrib["href"]

# if we were unable to find an enclosure for this episode,
# don't add it
Expand Down