-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig_reader.py
99 lines (83 loc) · 3.45 KB
/
config_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from queue import Queue, Empty
from typing import Callable
from PyQt5.QtCore import pyqtBoundSignal
from PyQt5.QtWidgets import QPushButton, QTableWidget, QGridLayout, QStatusBar
from fabric import Connection
from paramiko.ssh_exception import NoValidConnectionsError
from utils import get_yaml_file, get_config_file
class ConfigReader:
store: dict
config: dict
c: Connection
button: QPushButton
table_widget: QTableWidget
signal: pyqtBoundSignal
queue: Queue
name: str
autosize_window: Callable
status_bar: QStatusBar
def __init__(self, config: dict, name: str):
self.config = config
self.autosize_window = config['autosize_window']
self.set_connection()
self.table_widget = config['tableWidget']
self.signal = config['signal']
self.status_bar = config['statusBar']
self.queue = config['queue']
self.name = name
self.button = QPushButton(config['centralwidget'])
self.button.setEnabled(False)
self.button.setObjectName(self.name)
self.button.setText(self.name.replace('_', ' '))
self.button.clicked.connect(self.button_pressed)
column = config['widget_counter']
config['widget_counter'] += 1
grid_layout: QGridLayout = config['gridLayout']
spacer = grid_layout.itemAtPosition(0, column).spacerItem()
grid_layout.removeItem(spacer)
grid_layout.addWidget(self.button, 0, column, 1, 1)
grid_layout.addItem(spacer, 0, column + 1, 1, 1)
grid_layout.removeWidget(self.table_widget)
grid_layout.addWidget(self.table_widget, 1, 0, 1, column + 2)
def set_connection(self):
self.c = self.config['c']
def init_queue(self):
self.queue.put({'func': self.status_bar.showMessage, 'type': 'signal', 'arg': f'{self.name}..'})
self.queue.put({'func': self.get_info, 'type': 'ssh'})
self.queue.put({'func': self.button.setEnabled, 'type': 'signal', 'arg': True})
def get_info(self):
pass
def button_pressed(self):
self.button.setEnabled(False)
if self.config.get('last_clicked', '') == self.name:
self.queue.put({'func': self.get_info, 'type': 'ssh'})
self.queue.put({'func': self.show_info, 'type': 'signal'})
self.queue.put({'func': self.button.setEnabled, 'type': 'signal', 'arg': True})
self.config['last_clicked'] = self.name
def show_info(self):
pass
def clear_queue(self):
while not self.queue.empty():
try:
self.queue.get(block=False)
except Empty:
continue
self.queue.task_done()
def sudo(self, command: str):
try:
return self.c.sudo(command, hide=True)
except NoValidConnectionsError as e:
self.clear_queue()
self.queue.put({'func': self.status_bar.showMessage, 'type': 'signal', 'arg': str(e)})
def get_yaml_file(self, path: str):
try:
return get_yaml_file(self.c, path)
except NoValidConnectionsError as e:
self.clear_queue()
self.queue.put({'func': self.status_bar.showMessage, 'type': 'signal', 'arg': str(e)})
def get_config_file(self, path: str):
try:
return get_config_file(self.c, path)
except NoValidConnectionsError as e:
self.clear_queue()
self.queue.put({'func': self.status_bar.showMessage, 'type': 'signal', 'arg': str(e)})