1
1
from abc import ABC , abstractmethod
2
+ import datetime
3
+ import json
4
+ import logging
2
5
3
6
from modelgauge .annotation_pipeline import (
4
7
AnnotatorAssigner ,
18
21
CsvPromptOutput ,
19
22
)
20
23
24
+ logger = logging .getLogger (__name__ )
25
+
21
26
22
27
class PipelineRunner (ABC ):
23
- def __init__ (self , num_workers , input_path , output_path , cache_dir , sut_options = None ):
28
+ def __init__ (self , num_workers , input_path , output_dir , cache_dir , sut_options , tag = None ):
24
29
self .num_workers = num_workers
25
30
self .input_path = input_path
26
- self .output_path = output_path
31
+ self .root_dir = output_dir
27
32
self .cache_dir = cache_dir
28
33
self .sut_options = sut_options
34
+ self .tag = tag
29
35
self .pipeline_segments = []
36
+ self .start_time = datetime .datetime .now ()
37
+ self .finish_time = None
30
38
31
39
self ._initialize_segments ()
32
40
@@ -44,13 +52,46 @@ def num_total_items(self):
44
52
"""Total number of items to process."""
45
53
pass
46
54
55
+ def metadata (self ):
56
+ duration = self .finish_time - self .start_time
57
+ hours , minutes , seconds = str (duration ).split (":" )
58
+ duration_string = f"{ hours } h{ minutes } m{ seconds } s"
59
+
60
+ metadata = {
61
+ "run_id" : self .run_id ,
62
+ "run_info" : {
63
+ "started" : str (self .start_time ),
64
+ "finished" : str (self .finish_time ),
65
+ "duration" : duration_string ,
66
+ },
67
+ "input" : {
68
+ "source" : self .input_path .name ,
69
+ "num_items" : self .num_input_items ,
70
+ },
71
+ }
72
+ return metadata
73
+
74
+ def output_dir (self ):
75
+ output_path = self .root_dir / self .run_id
76
+ if not output_path .exists ():
77
+ logger .info (f"Creating output dir { output_path } " )
78
+ output_path .mkdir (parents = True )
79
+ return output_path
80
+
47
81
def run (self , progress_callback , debug ):
48
82
pipeline = Pipeline (
49
83
* self .pipeline_segments ,
50
84
progress_callback = progress_callback ,
51
85
debug = debug ,
52
86
)
53
87
pipeline .run ()
88
+ self .finish_time = datetime .datetime .now ()
89
+ logger .info (f"\n output saved to { self .output_dir () / self .output_file_name } " )
90
+ self ._write_metadata ()
91
+
92
+ @staticmethod
93
+ def format_date (date ):
94
+ return date .strftime ("%Y%m%d-%H%M%S" )
54
95
55
96
@abstractmethod
56
97
def _initialize_segments (self ):
@@ -62,7 +103,7 @@ def _add_prompt_segments(self, suts, include_sink=True):
62
103
self .pipeline_segments .append (PromptSutAssigner (suts ))
63
104
self .pipeline_segments .append (PromptSutWorkers (suts , self .num_workers , cache_path = self .cache_dir ))
64
105
if include_sink :
65
- output = CsvPromptOutput (self .output_path , suts )
106
+ output = CsvPromptOutput (self .output_dir () / self . output_file_name , suts )
66
107
self .pipeline_segments .append (PromptSink (suts , output ))
67
108
68
109
def _add_annotator_segments (self , annotators , include_source = True ):
@@ -71,9 +112,45 @@ def _add_annotator_segments(self, annotators, include_source=True):
71
112
self .pipeline_segments .append (AnnotatorSource (input ))
72
113
self .pipeline_segments .append (AnnotatorAssigner (annotators ))
73
114
self .pipeline_segments .append (AnnotatorWorkers (annotators , self .num_workers ))
74
- output = JsonlAnnotatorOutput (self .output_path )
115
+ output = JsonlAnnotatorOutput (self .output_dir () / self . output_file_name )
75
116
self .pipeline_segments .append (AnnotatorSink (annotators , output ))
76
117
118
+ def _annotator_metadata (self ):
119
+ counts = self .pipeline_segments [- 1 ].annotation_counts
120
+ return {
121
+ "annotators" : [
122
+ {
123
+ "uid" : uid ,
124
+ }
125
+ for uid , annotator in self .annotators .items ()
126
+ ],
127
+ "annotations" : {
128
+ "count" : sum (counts .values ()),
129
+ "by_annotator" : {uid : {"count" : count } for uid , count in counts .items ()},
130
+ },
131
+ }
132
+
133
+ def _sut_metadata (self ):
134
+ counts = self .pipeline_segments [- 1 ].sut_response_counts
135
+ return {
136
+ "suts" : [
137
+ {
138
+ "uid" : uid ,
139
+ "initialization_record" : sut .initialization_record .model_dump (),
140
+ "sut_options" : self .sut_options .model_dump (exclude_none = True ),
141
+ }
142
+ for uid , sut in self .suts .items ()
143
+ ],
144
+ "responses" : {
145
+ "count" : sum (counts .values ()),
146
+ "by_sut" : {uid : {"count" : count } for uid , count in counts .items ()},
147
+ },
148
+ }
149
+
150
+ def _write_metadata (self ):
151
+ with open (self .output_dir () / "metadata.json" , "w" ) as f :
152
+ json .dump (self .metadata (), f , indent = 4 )
153
+
77
154
78
155
class PromptRunner (PipelineRunner ):
79
156
def __init__ (self , * args , suts ):
@@ -84,6 +161,19 @@ def __init__(self, *args, suts):
84
161
def num_total_items (self ):
85
162
return self .num_input_items * len (self .suts )
86
163
164
+ @property
165
+ def output_file_name (self ):
166
+ return "prompt-responses.csv"
167
+
168
+ @property
169
+ def run_id (self ):
170
+ timestamp = self .format_date (self .start_time )
171
+ base_subdir_name = timestamp + "-" + self .tag if self .tag else timestamp
172
+ return f"{ base_subdir_name } -{ '-' .join (self .suts .keys ())} "
173
+
174
+ def metadata (self ):
175
+ return {** super ().metadata (), ** self ._sut_metadata ()}
176
+
87
177
def _initialize_segments (self ):
88
178
self ._add_prompt_segments (self .suts , include_sink = True )
89
179
@@ -98,6 +188,19 @@ def __init__(self, *args, suts, annotators):
98
188
def num_total_items (self ):
99
189
return self .num_input_items * len (self .suts ) * len (self .annotators )
100
190
191
+ @property
192
+ def output_file_name (self ):
193
+ return "prompt-responses-annotated.jsonl"
194
+
195
+ @property
196
+ def run_id (self ):
197
+ timestamp = self .format_date (self .start_time )
198
+ base_subdir_name = timestamp + "-" + self .tag if self .tag else timestamp
199
+ return f"{ base_subdir_name } -{ '-' .join (self .suts .keys ())} -{ '-' .join (self .annotators .keys ())} "
200
+
201
+ def metadata (self ):
202
+ return {** super ().metadata (), ** self ._sut_metadata (), ** self ._annotator_metadata ()}
203
+
101
204
def _initialize_segments (self ):
102
205
# Hybrid pipeline: prompt source + annotator sink
103
206
self ._add_prompt_segments (self .suts , include_sink = False )
@@ -113,5 +216,18 @@ def __init__(self, *args, annotators):
113
216
def num_total_items (self ):
114
217
return self .num_input_items * len (self .annotators )
115
218
219
+ @property
220
+ def output_file_name (self ):
221
+ return "annotations.jsonl"
222
+
223
+ @property
224
+ def run_id (self ):
225
+ timestamp = self .format_date (self .start_time )
226
+ base_subdir_name = timestamp + "-" + self .tag if self .tag else timestamp
227
+ return f"{ base_subdir_name } -{ '-' .join (self .annotators .keys ())} "
228
+
229
+ def metadata (self ):
230
+ return {** super ().metadata (), ** self ._annotator_metadata ()}
231
+
116
232
def _initialize_segments (self ):
117
233
self ._add_annotator_segments (self .annotators , include_source = True )
0 commit comments