2
2
from mimetypes import guess_type
3
3
from os .path import basename , isfile , splitext , getsize , exists
4
4
from time import sleep
5
+ from typing import List , Dict
5
6
6
- from pyperclip import copy
7
- from requests import post
7
+ import pyperclip
8
+ import requests
8
9
from watchdog .events import FileSystemEventHandler
9
10
from watchdog .observers import Observer
10
11
12
+ # Configuration
11
13
# Absolute path to the folder to monitor for new files
12
- MONITOR_FOLDER_PATH = "<path>"
14
+ MONITOR_FOLDER_PATH : str = "<path>"
13
15
# List of valid file extensions that will be considered for upload
14
- VALID_EXTENSIONS = [".png" , ".jpg" , ".jpeg" , ".mov" ]
16
+ VALID_EXTENSIONS : List [ str ] = [".png" , ".jpg" , ".jpeg" , ".mov" ]
15
17
# The URL of the API endpoint for uploading files to your Zipline instance
16
- API_UPLOAD_URL = "https://<domain>/api/upload"
18
+ API_UPLOAD_URL : str = "https://<domain>/api/upload"
17
19
# The access token associated with your user account for authentication
18
- USER_ACCESS_TOKEN = "<access_token>"
20
+ USER_ACCESS_TOKEN : str = "<access_token>"
19
21
# The maximum allowable size for an individual file, specified in megabytes
20
- MAX_FILE_SIZE_MB = 40
22
+ MAX_FILE_SIZE_MB : int = 40
21
23
# A dictionary containing upload options for the Zipline API.
22
24
# For detailed information on available options, refer to the official documentation:
23
25
# https://zipline.diced.sh/docs/guides/upload-options
24
- UPLOAD_OPTIONS = {"Format" : "RANDOM" , "Embed" : "false" }
26
+ UPLOAD_OPTIONS : Dict [ str , str ] = {"Format" : "RANDOM" , "Embed" : "false" }
25
27
# Used to decide if the URL for the uploaded files should open in the browser.
26
- OPEN_URL_IN_BROWSER = False
28
+ OPEN_URL_IN_BROWSER : bool = False
27
29
28
30
29
- def validate_file (path ) :
31
+ def validate_file (path : str ) -> bool :
30
32
"""
31
33
Validates a file based on its type and size.
32
34
33
- Parameters :
34
- - path (str): The absolute path to the file.
35
+ Args :
36
+ path (str): The absolute path to the file.
35
37
36
38
Returns:
37
- - bool: True if the file is valid, False otherwise.
39
+ bool: True if the file is valid, False otherwise.
38
40
"""
41
+ # Check if it's a file and not hidden
39
42
if not isfile (path ) or basename (path ).startswith ('.' ):
40
43
return False
41
44
45
+ # Check if the file extension is valid
42
46
if splitext (path )[1 ] not in VALID_EXTENSIONS :
43
47
print (f"Error: { basename (path )} has an unsupported file extension. "
44
48
f"Allowed extensions: { ', ' .join (VALID_EXTENSIONS )} " )
45
49
return False
46
50
47
- if getsize (path ) >= MAX_FILE_SIZE_MB * (1 << 20 ):
51
+ # Check if the file size is within the limit
52
+ if getsize (path ) >= MAX_FILE_SIZE_MB * (1 << 20 ): # Convert MB to bytes
48
53
print (f"Error: { basename (path )} exceeds the permitted file size limit "
49
- f"({ getsize (path ) / (1 << 20 ):.2f} MB > 40MB )." )
54
+ f"({ getsize (path ) / (1 << 20 ):.2f} MB > { MAX_FILE_SIZE_MB } MB )." )
50
55
return False
51
56
52
57
return True
53
58
54
59
55
60
class MonitorFolder (FileSystemEventHandler ):
56
61
def __init__ (self ):
57
- self .array = []
62
+ self .array : List [ Dict [ str , str ]] = [] # Store processed files
58
63
59
- def handle_array (self , event ):
64
+ def handle_array (self , event ) -> bool :
60
65
"""
61
66
Handles an array of files by processing each file in the array and updating the `array` attribute.
62
67
63
- Parameters:
64
- - self (object): The object handling the array of files.
65
- - event (event.FileSystemEvent): The event representing the file being processed.
68
+ Args:
69
+ event (FileSystemEvent): The event representing the file being processed.
66
70
67
71
Returns:
68
72
bool: Whether the function executed successfully or not.
69
73
"""
70
74
seen_files = set ()
71
75
unique_array = []
72
76
73
- for item in reversed (self .array ): # Loop through the array of files and process each one
77
+ # Process files in reverse order to keep the most recent entries
78
+ for item in reversed (self .array ):
74
79
if item ['file' ] not in seen_files :
75
80
unique_array .append (item )
76
81
seen_files .add (item ['file' ])
77
82
83
+ # Check if the current file has already been processed
78
84
if item ['file' ] == event .src_path :
79
85
if item ['status' ] == 'processed' :
80
86
return False
81
- item ['status' ] = 'processed' # Mark the file as processed
87
+ item ['status' ] = 'processed'
82
88
83
- self .array = unique_array [- 10 :] # Keep the last 10 items in the array
89
+ # Keep only the last 10 processed files
90
+ self .array = unique_array [- 10 :]
84
91
85
92
return True
86
93
87
94
def upload_file (self , event ):
95
+ """
96
+ Uploads the file to the specified API endpoint.
97
+
98
+ Args:
99
+ event (FileSystemEvent): The event representing the file to be uploaded.
100
+ """
101
+ if not self .handle_array (event ):
102
+ return # File already processed, skip upload
103
+
104
+ headers = {"Authorization" : USER_ACCESS_TOKEN , ** UPLOAD_OPTIONS }
88
105
try :
89
- if not self .handle_array (event ):
90
- return
91
-
92
- headers = {"Authorization" : USER_ACCESS_TOKEN , ** UPLOAD_OPTIONS }
93
- files = {
94
- "file" : (
95
- basename (event .src_path ),
96
- open (event .src_path , "rb" ),
97
- guess_type (event .src_path )[0 ],
98
- )
99
- }
100
- response = post (API_UPLOAD_URL , headers = headers , files = files , timeout = 10 )
101
-
102
- if response .status_code == 200 :
103
- print (f"File uploaded successfully: { response .json ()['files' ][0 ]} " )
104
- copy (response .json ()["files" ][0 ]) # Copy the URL to the clipboard
105
-
106
- if OPEN_URL_IN_BROWSER :
107
- webbrowser .open (
108
- response .json ()["files" ][0 ]) # Open the uploaded file in the browser if OPEN_URL_IN_BROWSER
109
- elif response .status_code == 401 :
110
- print ("Authentication failed. Please check your USER_ACCESS_TOKEN." )
111
- else :
112
- print (f"File upload failed. Status code: { response .status_code } " )
113
- except PermissionError as error :
114
- print (error )
106
+ with open (event .src_path , "rb" ) as file :
107
+ files = {
108
+ "file" : (
109
+ basename (event .src_path ),
110
+ file ,
111
+ guess_type (event .src_path )[0 ],
112
+ )
113
+ }
114
+ # Send POST request to upload the file
115
+ response = requests .post (API_UPLOAD_URL , headers = headers , files = files , timeout = 10 )
116
+
117
+ response .raise_for_status () # Raise an exception for bad responses
118
+ file_url = response .json ()["files" ][0 ]
119
+ print (f"File uploaded successfully: { file_url } " )
120
+ pyperclip .copy (file_url ) # Copy the URL to clipboard
121
+
122
+ if OPEN_URL_IN_BROWSER :
123
+ webbrowser .open (file_url ) # Open the URL in browser if configured
124
+ except requests .exceptions .RequestException as e :
125
+ print (f"File upload failed: { str (e )} " )
126
+ except PermissionError as e :
127
+ print (f"Permission error: { str (e )} " )
115
128
116
129
def on_any_event (self , event ):
117
130
"""
118
- Event handler for created files in the monitored folder.
119
-
120
- Parameters:
121
- - event (FileSystemEvent): The event object representing the created file.
131
+ Event handler for created or modified files in the monitored folder.
122
132
123
- Returns :
124
- None
133
+ Args :
134
+ event (FileSystemEvent): The event object representing the file event.
125
135
"""
126
136
if event .event_type not in ['created' , 'modified' ]:
127
- return
137
+ return # Ignore events other than file creation or modification
128
138
129
139
if not validate_file (event .src_path ):
130
- return # Exit early if validation fails
140
+ return # Skip invalid files
131
141
132
142
self .array .append ({'file' : event .src_path , 'status' : 'processing' })
133
143
134
- sleep (0.02 ) # Apply a small sleep time to allow for system events
144
+ sleep (0.02 ) # Small delay to allow for system events
135
145
136
146
self .upload_file (event ) # Attempt to upload the file
137
147
138
148
139
- if __name__ == "__main__" :
149
+ def main ():
150
+ """
151
+ Main function to set up and run the file monitoring system.
152
+ """
140
153
if not exists (MONITOR_FOLDER_PATH ):
141
154
raise FileNotFoundError (f"The specified path does not exist: { MONITOR_FOLDER_PATH } " )
142
155
@@ -152,3 +165,7 @@ def on_any_event(self, event):
152
165
except KeyboardInterrupt :
153
166
observer .stop ()
154
167
observer .join ()
168
+
169
+
170
+ if __name__ == "__main__" :
171
+ main ()
0 commit comments