-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannelVideoDataExtraction.py
255 lines (201 loc) · 10.2 KB
/
channelVideoDataExtraction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import re
import pandas as pd
import googleapiclient.discovery
def getVideoComments(api_key, video_id):
# Create a YouTube Data API object
youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=api_key)
# Make an API request to get all the comments for the video
request = youtube.commentThreads().list(part="snippet,replies",
videoId=video_id,
maxResults=100,
textFormat='plainText')
response = request.execute()
all_comments = []
for comment in response['items']:
comment_data = {
'comment_id': comment['id'],
'author': comment["snippet"]["topLevelComment"]['snippet']
.get('authorDisplayName', None),
'like_count': comment["snippet"]["topLevelComment"]['snippet']
.get('likeCount', None),
'comment_text': comment["snippet"]["topLevelComment"]['snippet']
.get('textOriginal', None),
'comment_date': comment["snippet"]["topLevelComment"]['snippet']
.get('publishedAt', None),
}
all_comments.append(comment_data)
# Check if there are replies
if 'replies' in comment:
for reply in comment['replies']['comments']:
reply_data = {
'comment_id': reply['id'],
'author': reply['snippet']
.get('authorDisplayName', None),
'comment_text': reply['snippet']
.get('textOriginal', None),
'comment_date': reply['snippet']
.get('publishedAt', None),
'like_count': reply['snippet']
.get('likeCount', None),
'linkage': comment_data['comment_id'], # Link reply to the main comment
}
all_comments.append(reply_data)
next_page_available = response.get('nextPageToken')
is_other_pages = True
while is_other_pages:
if len(all_comments) == 1000:
break
if next_page_available is None:
is_other_pages = False
else:
request = youtube.commentThreads() \
.list(part="snippet,replies",
videoId=video_id,
maxResults=100,
textFormat='plainText',
pageToken=next_page_available)
response = request.execute()
for comment in response['items']:
comment_data = {
'comment_id': comment['id'],
'author': comment["snippet"]["topLevelComment"]['snippet']
.get('authorDisplayName', None),
'like_count': comment["snippet"]["topLevelComment"]['snippet']
.get('likeCount', None),
'comment_text': comment["snippet"]["topLevelComment"]['snippet']
.get('textOriginal', None),
'comment_date': comment["snippet"]["topLevelComment"]['snippet']
.get('publishedAt', None),
}
all_comments.append(comment_data)
# Check if there are replies
if 'replies' in comment:
for reply in comment['replies']['comments']:
reply_data = {
'comment_id': reply['id'],
'author': reply['snippet']
.get('authorDisplayName', None),
'comment_text': reply['snippet']
.get('textOriginal', None),
'comment_date': reply['snippet']
.get('publishedAt', None),
'like_count': reply['snippet']
.get('likeCount', None),
'linkage': comment_data['comment_id'],
}
all_comments.append(reply_data)
next_page_available = response.get('nextPageToken')
# create the dataframe
comment_data = pd.DataFrame(all_comments)
# Define the regex pattern for illegal characters
# For this example, I'll remove non-printable ASCII characters and the character '𝙄'
pattern = r'[^\x20-\x7E]|𝙄'
# Remove illegal characters from the entire dataframe
comment_data.replace(pattern, '', regex=True, inplace=True)
comment_data = comment_data.drop_duplicates()
comment_data["like_count"] = comment_data["like_count"]\
.apply(pd.to_numeric, errors='coerce')
# Remove duplicates based on the 'comment_text' column
comment_data = comment_data.drop_duplicates(subset='comment_text')
# Convert 'published_date' to a pandas datetime object
comment_data['comment_date'] = pd.to_datetime(comment_data['comment_date'])
# Format 'published_date' with AM/PM in the timezone
comment_data['comment_date'] = comment_data['comment_date']\
.dt.strftime('%Y-%m-%d %I:%M:%S')
# Sort the DataFrame by "like_count" in descending order
comment_data = comment_data.sort_values(by="like_count", ascending=False)
# Reset the index
comment_data.reset_index(drop=True, inplace=True)
comment_data.to_excel("all_comments.xlsx", index=False)
print(comment_data.head(5))
return comment_data
def getVideoList(api_key, playlist_id):
# Create a YouTube API object
youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=api_key)
request = youtube.playlistItems().list(part="contentDetails,snippet",
playlistId=playlist_id,
maxResults=50)
response = request.execute()
all_videos = []
for vid in response['items']:
vid_stats = {
'id': vid['contentDetails'].get('videoId', None),
'title': vid['snippet'].get('title', None),
'thumbnail': vid['snippet']['thumbnails']['default']['url']
}
all_videos.append(vid_stats)
next_page_available = response.get('nextPageToken')
is_next_pages = True
while is_next_pages:
if next_page_available is None:
is_next_pages = False
else:
request = youtube.playlistItems().list(part="contentDetails,snippet",
playlistId=playlist_id,
maxResults=50,
pageToken=next_page_available)
response = request.execute()
for vid in response['items']:
vid_stats = {
'id': vid['contentDetails'].get('videoId', None),
'title': vid['snippet'].get('title', None),
'thumbnail': vid['snippet']['thumbnails']['default']['url']
}
all_videos.append(vid_stats)
next_page_available = response.get('nextPageToken')
# print(all_videos)
return all_videos
def buildVideoListDataframe(api_key, video_ids):
youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=api_key)
all_vids_stats = []
for i in range(0, len(video_ids), 50):
request = youtube.videos().list(
part='snippet,contentDetails,statistics',
id=','.join(video_ids[i:i + 50]))
response = request.execute()
for vid in response['items']:
thumbnail_url = vid['snippet']['thumbnails'].get('standard', {}).get('url', None)
vid_stats = {
'id': vid.get('id', None),
'title': vid['snippet'].get('title', None),
'published_date': vid['snippet'].get('publishedAt', None),
'tags': vid['snippet'].get('tags', []),
'duration': vid['contentDetails'].get('duration', None),
'view_count': vid['statistics'].get('viewCount', None),
'like_count': vid['statistics'].get('likeCount', None),
'favorite_count': vid['statistics'].get('favoriteCount', None),
'comment_count': vid['statistics'].get('commentCount', None),
'thumbnail': thumbnail_url
}
all_vids_stats.append(vid_stats)
# create the dataframe
vids_info = pd.DataFrame(all_vids_stats)
# Convert columns to numeric
numeric_columns = ['comment_count', 'like_count', 'view_count']
vids_info[numeric_columns] = vids_info[numeric_columns]\
.apply(pd.to_numeric, errors='coerce')
# Function to convert ISO 8601 duration to minutes
def iso8601_duration_to_minutes(duration):
minutes_match = re.search(r'(\d+)M', duration)
seconds_match = re.search(r'(\d+)S', duration)
# Get the minutes and seconds values, or default to 0 if they are not found.
minutes = int(minutes_match.group(1)) if minutes_match else 0
seconds = int(seconds_match.group(1)) if seconds_match else 0
# Calculate the total duration in minutes.
total_minutes = minutes + seconds / 60.0
return total_minutes
# Apply the conversion function to the 'duration' column
vids_info['duration_minutes'] = vids_info['duration']\
.apply(iso8601_duration_to_minutes)
# Convert 'published_date' to a pandas datetime object
vids_info['published_date'] = pd.to_datetime(vids_info['published_date'])
# Format 'published_date'
vids_info['published_date'] = vids_info['published_date']\
.dt.strftime('%Y-%m-%d %I:%M:%S')
vids_info.to_excel("all_vids_info.xlsx", index=False)
print(vids_info.head(5))
return vids_info
# video_ids = getVideoList(API_KEY, playlist_id)
# video_ids = [video['id'] for video in video_ids if video['id'] is not None]
# buildVideoListDataframe(API_KEY, video_ids)
#getVideoComments(api_key, "video_id")