1
+ import typer
2
+ from typer .core import TyperGroup
3
+ from click import Context
4
+ from rich .console import Console
5
+ from rich import print
6
+ from logging import getLogger
7
+
8
+ from naas_python .domains .asset .AssetSchema import (
9
+ IAssetDomain ,
10
+ IAssetPrimaryAdaptor ,
11
+ # IAssetInvoker,
12
+ Asset ,
13
+ AssetCreation ,
14
+ AssetUpdate
15
+ )
16
+
17
+ logger = getLogger (__name__ )
18
+
19
+ class OrderCommands (TyperGroup ):
20
+ def list_commands (self , ctx : Context ):
21
+ """Return list of commands in the order appear."""
22
+ return list (self .commands )
23
+
24
+ class TyperAssetAdaptor (IAssetPrimaryAdaptor ):
25
+ def __init__ (self , domain : IAssetDomain ):
26
+ super ().__init__ ()
27
+
28
+ self .domain = domain
29
+ self .console = Console ()
30
+
31
+ self .app = typer .Typer (
32
+ cls = OrderCommands ,
33
+ help = "Naas Asset CLI" ,
34
+ add_completion = False ,
35
+ no_args_is_help = True ,
36
+ pretty_exceptions_enable = False ,
37
+ rich_markup_mode = "rich" ,
38
+ context_settings = {"help_option_names" : ["-h" , "--help" ]},
39
+ )
40
+
41
+ self .app .command ("create" )(self .create_asset )
42
+ self .app .command ("delete" )(self .delete_asset )
43
+ self .app .command ("get" )(self .get_asset )
44
+ self .app .command ("update" )(self .update_asset )
45
+
46
+ def create_asset (self ,
47
+ workspace_id :str = typer .Option (None , "--workspace-id" , "-w" , help = "ID of the workspace" ),
48
+ # asset_creation: AssetCreation = typer.Option(..., "--object", "-o", help="Storage object to create an asset from."),
49
+ storage : str = typer .Option (None , "--storage" , "-s" , help = "Storage name to create an asset from. ie:\" data1\" " ),
50
+ object : str = typer .Option (None , "--object" , "-o" , help = "Object to create an asset from. ie:\" /dir1/tmp.txt\" " ),
51
+ version : str = typer .Option (None , "--object-version" , "-ov" , help = "Optional version of the storage object" ),
52
+ visibility : str = typer .Option (None , "--visibility" , "-vis" , help = "Optional visibility of the asset" ),
53
+ content_disposition : str = typer .Option (None , "--content-disposition" , "-cd" , help = "Optinal content disposition of the asset" ),
54
+ password : str = typer .Option (None , "--password" , "-p" , help = "Optional password to decrypt the storage object" ),
55
+ rich_preview : bool = typer .Option (
56
+ False ,
57
+ "--rich-preview" ,
58
+ "-rp" ,
59
+ help = "Rich preview of the information as a table" ,
60
+ ))-> Asset :
61
+ asset_creation_args :dict = {"asset_creation" : {}}
62
+ if workspace_id is not None :
63
+ asset_creation_args ["asset_creation" ]["workspace_id" ] = workspace_id
64
+ if storage is not None and object is not None :
65
+ asset_creation_args ["asset_creation" ]["storage_name" ] = storage
66
+ asset_creation_args ["asset_creation" ]["object_name" ] = object
67
+ if version is not None :
68
+ asset_creation_args ["asset_creation" ]["object_version" ] = version
69
+ if visibility is not None :
70
+ asset_creation_args ["asset_creation" ]["visibility" ] = visibility
71
+ if content_disposition is not None :
72
+ asset_creation_args ["asset_creation" ]["content_disposition" ] = content_disposition
73
+ if password is not None :
74
+ asset_creation_args ["asset_creation" ]["password" ] = password
75
+
76
+ asset_creation : AssetCreation = asset_creation_args
77
+ print ("creating asset..." )
78
+ asset = self .domain .create_asset (workspace_id , asset_creation )
79
+
80
+ def get_asset (self ,
81
+ workspace_id :str = typer .Option (None , "--workspace-id" , "-w" , help = "ID of the workspace" ),
82
+ asset_id : str = typer .Option (None , "--asset-id" , "-id" , help = "ID of the asset to get." ),
83
+ rich_preview : bool = typer .Option (
84
+ False ,
85
+ "--rich-preview" ,
86
+ "-rp" ,
87
+ help = "Rich preview of the information as a table" ,
88
+ ))-> Asset :
89
+ print ("getting asset..." )
90
+ asset = self .domain .get_asset (workspace_id , asset_id )
91
+ print (asset )
92
+
93
+ def update_asset (self ,
94
+ workspace_id :str = typer .Option (None , "--workspace-id" , "-w" , help = "ID of the workspace" ),
95
+ asset_id : str = typer .Option (None , "--asset-id" , "-id" , help = "ID of the asset to update." ),
96
+ visibility : str = typer .Option (None , "--visibility" , "-vis" , help = "Optional visibility of the asset" ),
97
+ content_disposition : str = typer .Option (None , "--content-disposition" , "-cd" , help = "Optinal content disposition of the asset" ),
98
+ # password: str = typer.Option(None, "--password", "-p", help="Optional password to decrypt the storage object"),
99
+ rich_preview : bool = typer .Option (
100
+ False ,
101
+ "--rich-preview" ,
102
+ "-rp" ,
103
+ help = "Rich preview of the information as a table" ,
104
+ ))-> Asset :
105
+
106
+ asset_update_args :dict = {"asset_update" : {}}
107
+ if visibility is not None :
108
+ asset_update_args ["asset_update" ]["visibility" ] = visibility
109
+ if content_disposition is not None :
110
+ asset_update_args ["asset_update" ]["content_disposition" ] = content_disposition
111
+ #TODO feature
112
+ # if password is not None:
113
+ # asset_update_args["asset_update"]["password"] = password
114
+ asset_update : AssetUpdate = asset_update_args
115
+ print ("updating asset..." )
116
+ asset = self .domain .update_asset (workspace_id , asset_id , asset_update )
117
+ print (asset )
118
+
119
+ def delete_asset (self ,
120
+ workspace_id :str = typer .Option (..., "--workspace-id" , "-w" , help = "ID of the workspace" ),
121
+ asset_id : str = typer .Option (..., "--asset-id" , "-id" , help = "ID of the asset to delete." ),
122
+ rich_preview : bool = typer .Option (
123
+ False ,
124
+ "--rich-preview" ,
125
+ "-rp" ,
126
+ help = "Rich preview of the information as a table" ,
127
+ ))-> None :
128
+ print ("deleting asset..." )
129
+ self .domain .delete_asset (workspace_id , asset_id )
130
+ print ("Done." )
0 commit comments