99import cwipc .net .source_passthrough
1010import cwipc .net .source_lldplay
1111import cwipc .net .source_decoder
12- from typing import Optional , NamedTuple
12+ import cwipc .net .source_synchronizer
13+ from typing import Optional , NamedTuple , List
1314
1415class ReceiverStatistics (NamedTuple ):
1516 timestamp : int
@@ -22,44 +23,46 @@ def __init__(self, args: argparse.Namespace):
2223 self .name = "testlatency.ReceiverThread"
2324 self .args = args
2425 self .exit_status = - 1
25- self .source : Optional [ cwipc_rawsource_abstract ] = None
26- self .decoder : Optional [cwipc_source_abstract ] = None
26+ self .needs_synchronizer = self . args . tiled or self . args . synchronizer
27+ self .pc_source : Optional [cwipc_source_abstract ] = None
2728 self .statistics : list [ReceiverStatistics ] = []
2829 self .stop_requested = False
2930 self .last_timestamp : Optional [int ] = None
3031
3132 def init (self ):
3233 url = "http://127.0.0.1:9000/lldash_testlatency.mpd"
33- self .source = cwipc .net .source_lldplay .cwipc_source_lldplay (url , verbose = self .args .debug )
3434 if self .args .uncompressed :
35- self . decoder = cwipc .net .source_passthrough .cwipc_source_passthrough ( self . source , verbose = self . args . debug )
35+ decoder_factory = cwipc .net .source_passthrough .cwipc_source_passthrough
3636 else :
37- self . decoder = cwipc .net .source_decoder .cwipc_source_decoder ( self . source , verbose = self . args . debug )
37+ decoder_factory = cwipc .net .source_decoder .cwipc_source_decoder
3838
39- assert self .decoder
40- if hasattr (self .decoder , 'start' ):
41- self .decoder .start ()
39+ if self .needs_synchronizer :
40+ raw_multisource = cwipc .net .source_lldplay .cwipc_multisource_lldplay (url , verbose = self .args .debug )
41+ n_tile = raw_multisource .get_tile_count ()
42+ if self .args .verbose :
43+ print (f"testlatency: receiver: multisource has { n_tile } tiles" , file = sys .stderr )
44+ decoders : List [cwipc_source_abstract ] = []
45+ for i in range (n_tile ):
46+ raw_source = raw_multisource .get_tile_source (i )
47+ decoder = decoder_factory (raw_source , verbose = self .args .debug )
48+ decoders .append (decoder )
49+ self .pc_source = cwipc .net .source_synchronizer .cwipc_source_synchronizer (raw_source , decoders , verbose = self .args .debug )
4250 else :
43- self .source .start ()
51+ raw_source = cwipc .net .source_lldplay .cwipc_source_lldplay (url , verbose = self .args .debug )
52+ self .pc_source = decoder_factory (raw_source , verbose = self .args .debug )
53+ assert self .pc_source
54+ self .pc_source .start ()
4455
4556 def stop (self ):
4657 self .stop_requested = True
4758
4859 def close (self ):
4960 if self .args .verbose :
50- assert self .source
51- self .source .statistics ()
52- if hasattr (self .decoder , 'statistics' ):
53- self .decoder .statistics ()
54- # self.source.stop()
55- if self .decoder and hasattr (self .decoder , 'stop' ):
56- self .decoder .stop ()
57-
58- if self .source and hasattr (self .source , 'free' ):
59- self .source .free ()
60- self .source = None
61- self .decoder .free ()
62- self .decoder = None
61+ self .pc_source .statistics ()
62+ if self .pc_source :
63+ self .pc_source .stop ()
64+ self .pc_source .free ()
65+ self .pc_source = None
6366
6467 def report (self , num : int , timestamp_ms : int , count : int ):
6568 now = time .time ()
@@ -77,17 +80,16 @@ def run(self):
7780 if self .args .debug :
7881 print ("testlatency: Starting receiver..." , file = sys .stderr )
7982 self .init ()
80- assert self .source
81- assert self .decoder
83+ assert self .pc_source
8284 start_time = time .time ()
8385 num = 0
8486 self .exit_status = 0
85- while not self .decoder .eof () and not self .stop_requested :
86- if not self .decoder .available (True ):
87+ while not self .pc_source .eof () and not self .stop_requested :
88+ if not self .pc_source .available (True ):
8789 continue
88- pc = self .decoder .get ()
90+ pc = self .pc_source .get ()
8991 if pc == None :
90- if not self .decoder .eof ():
92+ if not self .pc_source .eof ():
9193 print ("testlatency: receiver: No point cloud received but not eof(), aborting..." , file = sys .stderr )
9294 self .exit_status = - 1
9395 break
0 commit comments