-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
2768 lines (2420 loc) · 119 KB
/
Copy pathmain.py
File metadata and controls
2768 lines (2420 loc) · 119 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import pygame
import os
import subprocess
import threading
from queue import Queue
import re
import time
import math
import glob
import sys
import pyperclip
import webbrowser
FONT_NAME = "simhei"
GTP_COMMAND_ANALYZE = "kata-analyze interval 20"
INITIAL_COMMANDS = "showboard"
REFRESH_INTERVAL_SECOND = 0.02
# 棋盘常量
ROWS, COLS = 9, 7
ANALYSIS_PANEL_RATIO = 0.3 # 分析面板宽度比例
ANNOUNCE_RATIO = 0.2 # 公告栏宽度比例
KATAGO_COMMAND = "./resource/engine/katago.exe gtp -config ./resource/engine/engine2024.cfg -model ./resource/engine/b10c384nbt.bin.gz -override-config drawJudgeRule=WEIGHT"
NORMAL_MAX_VISITS = 1000000000
HUMAN_AI_ANALYZE_COMMAND = GTP_COMMAND_ANALYZE
HUMAN_AI_EVALUATION_VISITS = 500
HUMAN_AI_DIFFICULTIES = [
("新手", 50),
("业余", 150),
("爱好者", 500),
("大师", 1500),
("特级大师", 3000),
]
DRAW_MOVE_LIMIT = 300
BLUE_DEN = (8, 3) # D1
RED_DEN = (0, 3) # D9
DENS = {'w': BLUE_DEN, 'b': RED_DEN}
TRAPS = {
'w': {(8, 2), (7, 3), (8, 4)}, # C1, D2, E1
'b': {(0, 2), (1, 3), (0, 4)}, # C9, D8, E9
}
WATER = {
(3, 1), (3, 2), (4, 1), (4, 2), (5, 1), (5, 2),
(3, 4), (3, 5), (4, 4), (4, 5), (5, 4), (5, 5),
}
PIECE_RANKS = {
'r': 1, 'c': 2, 'd': 3, 'w': 4, 'j': 5, 't': 6, 'l': 7, 'e': 8,
}
PIECE_NAMES_CN = {
'r': '鼠', 'c': '猫', 'd': '狗', 'w': '狼',
'j': '豹', 't': '虎', 'l': '狮', 'e': '象',
}
ANALYSIS_COLOR = (255, 255, 0, 100)
# GTP控制台常量
GTP_CONSOLE_RATIO = 0.3 # GTP控制台高度比例
GTP_MAX_LENGTH = 100
GTP_FONT_SIZE = 16
FONT_SCALE = 0.8
SCROLL_SPEED = 3
# 提示栏常量
INFORMATION_PANEL_POS_RATIO = 0.5 # 信息面板位置比例
PIECES = {
'r': 'ratr', 'c': 'catr', 'd': 'dogr', 'w': 'wolfr',
'j': 'leopardr', 't': 'tigerr', 'l': 'lionr', 'e': 'elephantr',
'R': 'Rat', 'C': 'Cat', 'D': 'Dog', 'W': 'Wolf',
'J': 'Leopard', 'T': 'Tiger', 'L': 'Lion', 'E': 'Elephant'
}
def get_opp(p):
if p == 'w':
return 'b'
elif p == 'b':
return 'w'
return None
def movestr_to_pos(move):
if len(move) != 2 and len(move) != 3:
return (None, None)
col = ord(move[0].upper()) - ord('A')
assert (col != ord('I') - ord('A'))
if col > ord('I') - ord('A'): # gtp协议不包括i
col -= 1
row = ROWS - int(move[1:]) if len(move) == 2 else ROWS - int(move[1:3])
return col, row
def draw_arrow(arrow_surface, start_pos, end_pos, line_width, arrow_size, color=(128, 128, 128, 128)):
# 计算箭头的方向
dx, dy = end_pos[0] - start_pos[0], end_pos[1] - start_pos[1]
if dx * dx < 5 and dy * dy < 5:
return
angle = math.atan2(dy, dx) # 计算角度
dl = line_width / 2 + arrow_size / 2
dx -= dl * math.cos(angle)
dy -= dl * math.sin(angle)
end_pos2 = (start_pos[0] + dx, start_pos[1] + dy)
start_pos = (int(start_pos[0]), int(start_pos[1]))
end_pos = (int(end_pos[0]), int(end_pos[1]))
end_pos2 = (int(end_pos2[0]), int(end_pos2[1]))
line_width = int(line_width)
# 绘制箭头线
pygame.draw.line(
arrow_surface, color, start_pos, end_pos2, line_width
)
# 计算箭头头部的三个点
arrow_points = [
end_pos, # 箭头尖端
(
end_pos[0] - arrow_size * math.cos(angle - math.pi / 6),
end_pos[1] - arrow_size * math.sin(angle - math.pi / 6),
), # 左侧点
(
end_pos[0] - arrow_size * math.cos(angle + math.pi / 6),
end_pos[1] - arrow_size * math.sin(angle + math.pi / 6),
), # 右侧点
]
# 绘制箭头头部
pygame.draw.polygon(arrow_surface, color, arrow_points)
def draw_arrow2(screen, start_pos, end_pos, line_width, out_width, arrow_size, color=(128, 128, 128, 64),
color_out=(255, 0, 0, 128)):
"""
在屏幕上绘制一个嵌套的半透明的粗箭头
:param screen: Pygame 的 Surface 对象(屏幕)
:param start_pos: 箭头起点坐标 (x1, y1)
:param end_pos: 箭头终点坐标 (x2, y2)
:param color: 箭头颜色,RGBA 格式(默认是半透明灰色)
:param line_width: 箭头线的宽度(默认 10)
:param arrow_size: 箭头头部的大小(默认 20)
"""
arrow_size2 = arrow_size + (2 * 3 ** 0.5) * out_width
line_width2 = int(line_width + 2 * out_width)
# 计算内侧箭头的起点终点
dx, dy = end_pos[0] - start_pos[0], end_pos[1] - start_pos[1]
angle = math.atan2(dy, dx) # 计算角度
dl = line_width / 2 + arrow_size2 / 2
dx -= dl * math.cos(angle)
dy -= dl * math.sin(angle)
end_pos2 = (end_pos[0] - 2 * out_width * math.cos(angle), end_pos[1] - 2 * out_width * math.sin(angle))
start_pos2 = (start_pos[0] + out_width * math.cos(angle), start_pos[1] + out_width * math.sin(angle))
# 创建一个半透明的 Surface
arrow_surface = pygame.Surface(screen.get_size(), pygame.SRCALPHA)
arrow_surface.fill((0, 0, 0, 0)) # 透明背景
draw_arrow(arrow_surface, start_pos, end_pos, line_width2, arrow_size2, color=color_out)
draw_arrow(arrow_surface, start_pos2, end_pos2, line_width, arrow_size, color=color)
# 将半透明 Surface 绘制到屏幕上
screen.blit(arrow_surface, (0, 0))
def maybe_first_start():
# 目标目录
directory = r"./resource/engine/KataGoData/opencltuning"
# 获取目录下所有 .txt 文件的路径
txt_files = glob.glob(os.path.join(directory, "*.txt"))
# 遍历每个 .txt 文件
for file_path in txt_files:
# 检查文件是否非空
if os.path.getsize(file_path) > 0:
return False # 存在非空的 .txt 文件
return True # 没有非空的 .txt 文件
class Dandelion:
# 在类开头添加需要被其他方法调用的方法定义
def try_send_command(self, cmds, enable_lock=True):
cmds = cmds.split("\n")
for cmd in cmds:
try:
self.katago_process.stdin.write(cmd + "\n")
self.katago_process.stdin.flush()
if enable_lock:
with self.analysis_lock:
self.gtp_log.append(('sent', cmd.strip()))
else:
self.gtp_log.append(('sent', cmd.strip()))
except Exception as e:
self.show_error_dialog = True
self.error_message = f"Instruction sending failed: {str(e)}"
def show_error(self, message):
self.show_error_dialog = True
self.error_message = message
def player_name(self, player):
return "蓝方" if player == 'w' else "红方"
def gtp_color_for_player(self, player):
return 'B' if player == 'w' else 'W'
def is_piece_of_player(self, piece, player):
if piece == ' ':
return False
return (player == 'w' and piece.isupper()) or (player == 'b' and piece.islower())
def piece_owner(self, piece):
if piece == ' ':
return None
return 'w' if piece.isupper() else 'b'
def piece_rank(self, piece):
return PIECE_RANKS.get(piece.lower(), 0)
def result_text(self, result):
if not result:
return ""
if result.get('type') == 'draw':
return f"和棋:{result.get('reason', '')}"
return f"{self.player_name(result.get('winner'))}胜:{result.get('reason', '')}"
def can_lion_tiger_jump_over_own_rat(self):
return self.game_rule in [2, 3]
def can_water_land_rats_capture(self):
return self.game_rule in [1, 3]
def is_water(self, row, col):
return (row, col) in WATER
def is_own_den(self, player, row, col):
return DENS[player] == (row, col)
def can_capture_piece(self, piece, target_piece, from_pos, to_pos):
if target_piece == ' ':
return True
player = self.piece_owner(piece)
target_player = self.piece_owner(target_piece)
if player is None or target_player is None or player == target_player:
return False
# A piece in the defender's own trap can be captured by any defender.
if to_pos in TRAPS[player]:
return True
piece_type = piece.lower()
target_type = target_piece.lower()
from_water = from_pos in WATER
to_water = to_pos in WATER
if piece_type == 'r' and target_type == 'r':
if from_water != to_water and not self.can_water_land_rats_capture():
return False
return True
if piece_type == 'r' and target_type == 'e':
return not from_water
if piece_type == 'e' and target_type == 'r':
return False
mover_rank = self.piece_rank(piece)
target_rank = self.piece_rank(target_piece)
if from_pos in TRAPS[target_player]:
mover_rank = 0
return mover_rank >= target_rank
def legal_move_destination(self, row, col, drow, dcol):
piece = self.board[row][col]
player = self.piece_owner(piece)
if player is None:
return None
target_row = row + drow
target_col = col + dcol
if not (0 <= target_row < ROWS and 0 <= target_col < COLS):
return None
if piece.lower() in ('l', 't') and self.is_water(target_row, target_col):
jump_row, jump_col = target_row, target_col
while 0 <= jump_row < ROWS and 0 <= jump_col < COLS and self.is_water(jump_row, jump_col):
blocker = self.board[jump_row][jump_col]
if blocker.lower() == 'r':
blocker_owner = self.piece_owner(blocker)
if blocker_owner != player or not self.can_lion_tiger_jump_over_own_rat():
return None
jump_row += drow
jump_col += dcol
if not (0 <= jump_row < ROWS and 0 <= jump_col < COLS):
return None
target_row, target_col = jump_row, jump_col
if self.is_own_den(player, target_row, target_col):
return None
target_piece = self.board[target_row][target_col]
if target_piece != ' ' and self.piece_owner(target_piece) == player:
return None
if self.is_water(target_row, target_col) and piece.lower() != 'r':
return None
if target_piece != ' ' and not self.can_capture_piece(piece, target_piece, (row, col), (target_row, target_col)):
return None
return target_row, target_col
def has_legal_move(self, player):
for row in range(ROWS):
for col in range(COLS):
piece = self.board[row][col]
if not self.is_piece_of_player(piece, player):
continue
for drow, dcol in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
if self.legal_move_destination(row, col, drow, dcol) is not None:
return True
return False
def calculate_game_result(self):
if self.board[RED_DEN[0]][RED_DEN[1]] != ' ' and self.piece_owner(self.board[RED_DEN[0]][RED_DEN[1]]) == 'w':
return {'type': 'win', 'winner': 'w', 'reason': '进入红色兽穴D9'}
if self.board[BLUE_DEN[0]][BLUE_DEN[1]] != ' ' and self.piece_owner(self.board[BLUE_DEN[0]][BLUE_DEN[1]]) == 'b':
return {'type': 'win', 'winner': 'b', 'reason': '进入蓝色兽穴D1'}
if self.current_movenum >= DRAW_MOVE_LIMIT:
return {'type': 'draw', 'winner': None, 'reason': '达到300步(150回合)'}
if not self.has_legal_move(self.current_player):
return {'type': 'win', 'winner': get_opp(self.current_player), 'reason': f"{self.player_name(self.current_player)}无子可动"}
return None
def update_game_result(self):
self.game_result = self.calculate_game_result()
if self.game_result and self.mode == "human_ai":
self.human_ai_game_over = True
self.human_ai_ai_thinking = False
self.human_ai_status = self.result_text(self.game_result)
return self.game_result
def copy_board(self, board=None):
source = self.board if board is None else board
return [row.copy() for row in source]
def board_to_fen(self, board, player=None):
fen_rows = []
for row in board:
fen_row = []
empty = 0
for cell in row:
if cell == ' ':
empty += 1
else:
if empty > 0:
fen_row.append(str(empty))
empty = 0
fen_row.append(cell)
if empty > 0:
fen_row.append(str(empty))
fen_rows.append(''.join(fen_row))
fen = '/'.join(fen_rows)
if player is not None:
fen += f' {player}'
return fen
def coord_to_movestr(self, row, col):
return f"{chr(col + ord('A'))}{ROWS - row}"
def move_notation(self, move):
piece_name = PIECE_NAMES_CN.get(move.get('piece', ' ').lower(), '?')
sr, sc = move['start']
er, ec = move['end']
if er < sr:
direction = '上'
elif er > sr:
direction = '下'
elif ec < sc:
direction = '左'
elif ec > sc:
direction = '右'
else:
direction = '?'
return f"{piece_name}{direction}"
def reset_kifu_tree(self, board=None, player=None):
start_board = self.copy_board(board)
start_player = self.current_player if player is None else player
self.kifu_next_id = 1
self.kifu_nodes = {
0: {
'id': 0,
'parent': None,
'children': [],
'move': None,
'board': start_board,
'player': start_player,
'move_num': 0,
'last_move': None,
}
}
self.current_node_id = 0
self.view_node_id = 0
self.kifu_line_leaf_id = 0
self.move_history = []
def get_node_path_ids(self, node_id):
if node_id not in self.kifu_nodes:
return [0]
path = []
while node_id is not None and node_id in self.kifu_nodes:
path.append(node_id)
node_id = self.kifu_nodes[node_id]['parent']
return list(reversed(path))
def get_move_path(self, node_id):
moves = []
for path_id in self.get_node_path_ids(node_id)[1:]:
moves.append(self.kifu_nodes[path_id]['move'])
return moves
def is_node_on_line(self, node_id, line_ids):
return node_id in line_ids
def descend_first_child(self, node_id):
while node_id in self.kifu_nodes and self.kifu_nodes[node_id]['children']:
node_id = self.kifu_nodes[node_id]['children'][0]
return node_id
def displayed_line_ids(self):
leaf = self.kifu_line_leaf_id
if leaf not in self.kifu_nodes:
leaf = self.current_node_id if self.current_node_id in self.kifu_nodes else 0
self.kifu_line_leaf_id = leaf
line_ids = self.get_node_path_ids(leaf)
if self.view_node_id not in line_ids:
line_ids = self.get_node_path_ids(self.view_node_id)
return line_ids
def set_view_node(self, node_id, update_line=False):
if node_id not in self.kifu_nodes:
return
if self.selected_piece is not None:
self.unselect()
self.selected_piece = None
self.view_node_id = node_id
if update_line:
self.kifu_line_leaf_id = self.descend_first_child(node_id)
def kifu_nav_start(self):
self.set_view_node(0)
def kifu_nav_prev(self):
node = self.kifu_nodes.get(self.view_node_id)
if node and node['parent'] is not None:
self.set_view_node(node['parent'])
def kifu_nav_next(self):
line_ids = self.displayed_line_ids()
if self.view_node_id in line_ids:
index = line_ids.index(self.view_node_id)
if index + 1 < len(line_ids):
self.set_view_node(line_ids[index + 1])
return
node = self.kifu_nodes.get(self.view_node_id)
if node and node['children']:
self.set_view_node(node['children'][0], update_line=True)
def kifu_nav_latest(self):
self.set_view_node(self.current_node_id, update_line=True)
self.kifu_line_leaf_id = self.current_node_id
def get_display_node(self):
return self.kifu_nodes.get(self.view_node_id, self.kifu_nodes[0])
def is_viewing_current_node(self):
return self.view_node_id == self.current_node_id
def apply_node_to_live_state(self, node_id):
node = self.kifu_nodes.get(node_id)
if not node:
return
self.board = self.copy_board(node['board'])
self.current_player = node['player']
self.current_movenum = node['move_num']
self.last_move = node['last_move']
self.selected_piece = None
self.move_evaluation = None
self.move_history = self.get_move_path(node_id)
self.current_node_id = node_id
self.view_node_id = node_id
self.game_result = self.calculate_game_result()
def sync_engine_to_node(self, node_id, restart_analysis=True):
if node_id not in self.kifu_nodes:
return
root = self.kifu_nodes[0]
self.try_send_command("stop")
self.try_send_command("setfen " + self.board_to_fen(root['board'], root['player']))
for move in self.get_move_path(node_id):
color = self.gtp_color_for_player(move['player'])
sr, sc = move['start']
er, ec = move['end']
self.try_send_command(f"play {color} {self.coord_to_movestr(sr, sc)}")
self.try_send_command(f"play {color} {self.coord_to_movestr(er, ec)}")
with self.analysis_lock:
self.analysis_results.clear()
self.analysis_root_visits = 0
self.human_ai_root_visits = 0
self.human_ai_display_visits = 0
result = self.update_game_result()
if result:
self.try_send_command("stop")
elif restart_analysis and self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE)
def activate_view_node_for_branch(self, restart_analysis=True):
if self.is_viewing_current_node():
return
self.apply_node_to_live_state(self.view_node_id)
self.kifu_line_leaf_id = self.view_node_id
self.sync_engine_to_node(self.view_node_id, restart_analysis=restart_analysis)
self.ui_status = "已从历史局面创建新分支起点"
def same_tree_move(self, a, b):
return (
a.get('player') == b.get('player') and
a.get('start') == b.get('start') and
a.get('end') == b.get('end')
)
def record_move_in_kifu(self, move):
parent_id = self.current_node_id if self.current_node_id in self.kifu_nodes else 0
move = move.copy()
move['notation'] = self.move_notation(move)
existing_id = None
for child_id in self.kifu_nodes[parent_id]['children']:
if self.same_tree_move(self.kifu_nodes[child_id]['move'], move):
existing_id = child_id
break
if existing_id is None:
node_id = self.kifu_next_id
self.kifu_next_id += 1
self.kifu_nodes[parent_id]['children'].append(node_id)
self.kifu_nodes[node_id] = {
'id': node_id,
'parent': parent_id,
'children': [],
'move': move,
'board': self.copy_board(),
'player': self.current_player,
'move_num': self.current_movenum,
'last_move': self.last_move,
}
else:
node_id = existing_id
self.current_node_id = node_id
self.view_node_id = node_id
self.kifu_line_leaf_id = node_id
self.move_history = self.get_move_path(node_id)
def prompt_for_fen(self):
"""弹出对话框让用户输入FEN字符串"""
try:
import tkinter as tk
from tkinter import simpledialog
root = tk.Tk()
root.withdraw() # 隐藏主窗口
fen = simpledialog.askstring("设置局面", "请输入FEN字符串(格式:棋盘部分 当前玩家):")
root.destroy()
if fen:
self.apply_fen(fen)
except Exception as e:
self.show_error(f"无法创建输入框:{str(e)}")
def apply_fen(self, fen_str):
"""应用用户输入的FEN字符串"""
try:
# 分割FEN组成部分
parts = fen_str.strip().split()
if len(parts) < 1:
raise ValueError("FEN不能为空")
# 解析棋盘部分
board_part = parts[0]
rows = board_part.split('/')
if len(rows) != ROWS:
raise ValueError(f"需要{ROWS}行,实际{len(rows)}行")
new_board = []
for row in rows:
fen_row = []
empty = 0
for char in row:
if char.isdigit():
empty = empty * 10 + int(char)
else:
if empty > 0:
fen_row.extend([' '] * empty)
empty = 0
if char not in PIECES:
raise ValueError(f"无效棋子字符: {char}")
fen_row.append(char)
if empty > 0:
fen_row.extend([' '] * empty)
if len(fen_row) != COLS:
raise ValueError(f"行'{row}'列数错误,应有{COLS}列")
new_board.append(fen_row)
# 解析当前玩家(默认w)
current_player = 'w'
if len(parts) >= 2:
current_player = parts[1].lower()
if current_player not in ('w', 'b'):
raise ValueError("当前玩家应为w或b")
# 更新游戏状态
with self.analysis_lock:
self.board = new_board
self.current_player = current_player
self.selected_piece = None
self.last_move = None
self.current_movenum = 0
self.game_result = None
self.reset_kifu_tree(self.board, self.current_player)
# 同步到KataGo
self.sync_board_assume_locked()
self.try_send_command(f"setfen {self.get_fen()}", enable_lock=False)
result = self.update_game_result()
if result:
self.try_send_command("stop", enable_lock=False)
elif self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE, enable_lock=False)
except Exception as e:
self.show_error(f"FEN应用失败: {str(e)}")
def __init__(self):
self.mode = "main" # "main" 或 "editor"
self.last_analysis_time = 0 # 记录最后分析时间
self.analysis_refresh_interval = 0.1 # 刷新间隔(秒)
self.last_refresh_time = 0 # 记录最后刷新棋盘时间
# self.engine_ready = False # 引擎是否已经在stderr里返回“GTP ready”
pygame.init()
# 初始窗口大小
self.screen_width = 1300
self.screen_height = 850
self.screen = pygame.display.set_mode((self.screen_width, self.screen_height), pygame.RESIZABLE)
pygame.display.set_caption("Dandelion 斗兽棋")
# 初始化需要在加载资源前定义的属性
self.eval_images = {} # To store evaluation images
# ========== 新增:字体缓存 ==========
self.font_cache = {}
# 计算动态尺寸
self.calculate_sizes()
# 加载资源
self.load_resources()
# 初始化游戏状态
self.initial_board = [
['l', ' ', ' ', ' ', ' ', ' ', 't'],
[' ', 'd', ' ', ' ', ' ', 'c', ' '],
['r', ' ', 'j', ' ', 'w', ' ', 'e'],
[' ', ' ', ' ', ' ', ' ', ' ', ' '],
[' ', ' ', ' ', ' ', ' ', ' ', ' '],
[' ', ' ', ' ', ' ', ' ', ' ', ' '],
['E', ' ', 'W', ' ', 'J', ' ', 'R'],
[' ', 'C', ' ', ' ', ' ', 'D', ' '],
['T', ' ', ' ', ' ', ' ', ' ', 'L']
]
self.board = [row.copy() for row in self.initial_board]
self.selected_piece = None
self.current_player = 'w'
self.last_move = None # 存储最后一步移动信息
self.flip_board = False # 添加翻转棋盘标志
self.move_history = [] # 存储移动历史
# 分析系统
self.analyzing = True
self.analysis_results = []
self.analysis_root_visits = 0
self.analysis_lock = threading.Lock()
self.gtp_log = [] # GTP日志存储
self.scroll_offset = 0 # 滚动条位置
self.show_error_dialog = False
self.error_message = ""
self.aggressive_mode = 0 # 激进模式,0平衡,1黑激进,-1白激进
self.current_movenum = 0 # 目前多少步了
self.movenum_limit = 300 # 步数限制(mm)
self.simple_mode = False # 精简模式标志
self.move_evaluation = None # To store the evaluation of the last move
self.game_result = None
self.human_ai_phase = "setup"
self.human_ai_player = 'w'
self.human_ai_difficulty_index = 2
self.human_ai_ai_thinking = False
self.human_ai_ai_target_visits = HUMAN_AI_DIFFICULTIES[self.human_ai_difficulty_index][1]
self.human_ai_root_visits = 0
self.human_ai_display_visits = 0
self.human_ai_status = "请选择执棋方、难度和开局方式"
self.human_ai_buttons = {}
self.human_ai_game_over = False
self.main_buttons = {}
self.kifu_buttons = {}
self.ui_status = ""
self.reset_kifu_tree(self.board, self.current_player)
# kata-set-rule scoring 0 狮虎不能跳过己方老鼠,河里和陆上的老鼠不能互吃
# kata-set-rule scoring 1 狮虎不能跳过己方老鼠,河里和陆上的老鼠能互吃
# kata-set-rule scoring 2 狮虎能跳过己方老鼠,河里和陆上的老鼠不能互吃
# kata-set-rule scoring 3 狮虎能跳过己方老鼠,河里和陆上的老鼠能互吃
self.game_rule = 0
# 初始化引擎
self.start_katago()
self.set_movelimit(300)
self.set_aggressive_mode(0)
self.set_game_rule(0)
self.set_game_drawrule("WEIGHT") # 修改初始化为"WEIGHT"
self.set_game_looprule("seventhree")
# 编辑器相关变量
self.dragging_piece = None
self.drag_pos = (0, 0)
self.selected_piece_type = None
self.show_fen = True
self.show_fen_message = False
self.fen_message_time = 0
def calculate_sizes(self):
"""计算动态尺寸"""
# 棋盘格子大小
self.tile_size = min(self.screen_height // ROWS, self.screen_width // (COLS + 6))
# 区域宽度
self.announce_width = max(200, int(self.screen_width * ANNOUNCE_RATIO))
self.sidebar_width = max(400, int(self.screen_width * ANALYSIS_PANEL_RATIO))
# 棋盘区域宽度
self.board_width = COLS * self.tile_size
self.board_height = ROWS * self.tile_size
# 信息面板位置
self.information_panel_pos = int(self.screen_height * INFORMATION_PANEL_POS_RATIO)
# GTP控制台高度
self.gtp_console_height = int(self.screen_height * GTP_CONSOLE_RATIO)
# 总宽度
self.total_width = self.announce_width + self.board_width + self.sidebar_width
def load_resources(self):
"""加载并缩放资源"""
# 加载棋盘图片并缩放
self.board_img = pygame.image.load("resource/pieces/board.jpg").convert()
self.board_img = pygame.transform.scale(self.board_img, (self.board_width, self.board_height))
# 加载赞赏图片(如果存在)
try:
self.donate_img = pygame.image.load("resource/pieces/donate.jpg").convert_alpha()
self.donate_img = pygame.transform.scale(self.donate_img, (180, 180))
except FileNotFoundError:
self.donate_img = None
self.piece_images = {}
for key, name in PIECES.items():
img = pygame.image.load(f"resource/pieces/{name}.png").convert_alpha()
self.piece_images[key] = pygame.transform.scale(img, (self.tile_size - 10, self.tile_size - 10))
# 加载走法评估图片
eval_image_names = ["nice", "brilliant", "best", "ok", "mistake", "blunder"]
for name in eval_image_names:
try:
img = pygame.image.load(f"resource/pieces/{name}.png").convert_alpha()
self.eval_images[name] = pygame.transform.scale(img, (50, 50))
except pygame.error:
self.eval_images[name] = None
print(f"警告: 走法评估图片加载失败: resource/pieces/{name}.png")
# ========== 新增:安全获取字体的方法 ==========
def get_font(self, name, size, bold=False):
"""尝试多种方式获取字体,避免SysFont初始化失败"""
key = (name, size, bold)
if key in self.font_cache:
return self.font_cache[key]
font = None
# 先尝试用SysFont(可能触发异常)
try:
font = pygame.font.SysFont(name, size, bold=bold)
except Exception:
pass
# 如果失败,尝试直接加载simhei.ttf文件(常见路径)
if font is None:
try:
font = pygame.font.Font("C:/Windows/Fonts/simhei.ttf", size)
except Exception:
pass
# 最后回退到默认字体
if font is None:
try:
font = pygame.font.Font(None, size)
except Exception as e:
# 极罕见情况:连默认字体都失败,则创建一个最简单的字体(Pygame应能处理)
font = pygame.font.Font(None, size)
self.font_cache[key] = font
return font
def start_katago(self):
"""启动KataGo进程"""
try:
if maybe_first_start():
self.gtp_log.append(("warning", "引擎第一次启动需要5~10分钟,请耐心等待"))
self.katago_process = subprocess.Popen(
KATAGO_COMMAND.split(),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
errors='replace',
universal_newlines=True,
bufsize=1
)
threading.Thread(target=self.read_output, daemon=True).start()
threading.Thread(target=self.read_stderr, daemon=True).start()
self.try_send_command(INITIAL_COMMANDS)
if self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE)
except Exception as e:
self.show_error(f"Failed to load Katago: {str(e)}")
def restart_game(self):
self.board = [row.copy() for row in self.initial_board]
self.selected_piece = None
self.current_player = 'w'
self.last_move = None
self.analysis_results = []
self.current_movenum = 0
self.move_evaluation = None # 重置走法评估
self.game_result = None
self.reset_kifu_tree(self.board, self.current_player)
self.try_send_command("clear_board")
self.set_movelimit(300)
def sync_board_assume_locked(self, undo_once=False):
move_num_before_sync = self.current_movenum
next_player_should_be = self.current_player
if undo_once:
next_player_should_be = self.current_player if self.selected_piece is not None else get_opp(
self.current_player)
self.analysis_results.clear()
self.selected_piece = None
self.current_player = next_player_should_be
self.current_movenum = move_num_before_sync
fen = self.get_fen(has_pla=False)
fen = f"{fen} {next_player_should_be}"
self.try_send_command("setfen " + fen, enable_lock=False)
def swap_side(self):
with self.analysis_lock:
self.current_player = get_opp(self.current_player)
self.sync_board_assume_locked()
self.move_evaluation = None # 重置走法评估
result = self.update_game_result()
if result:
self.try_send_command("stop", enable_lock=False)
elif self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE, enable_lock=False)
def set_aggressive_mode(self, ag_mode):
with self.analysis_lock:
self.aggressive_mode = ag_mode
if self.aggressive_mode == 0:
self.try_send_command("komi 0.0", enable_lock=False)
self.try_send_command("kata-set-param playoutDoublingAdvantage 0.0", enable_lock=False)
elif self.aggressive_mode == 1:
self.try_send_command("komi 9.0", enable_lock=False)
self.try_send_command("kata-set-param playoutDoublingAdvantage -1.5", enable_lock=False)
elif self.aggressive_mode == -1:
self.try_send_command("komi -9.0", enable_lock=False)
self.try_send_command("kata-set-param playoutDoublingAdvantage 1.5", enable_lock=False)
if self.game_result:
self.try_send_command("stop", enable_lock=False)
elif self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE, enable_lock=False)
def set_movelimit(self, movelimit):
movelimit = movelimit - self.current_movenum
if movelimit > 999:
movelimit = 9
if movelimit < 1:
movelimit = 1
with self.analysis_lock:
self.sync_board_assume_locked()
self.movenum_limit = movelimit
self.try_send_command(f"mm {movelimit}", enable_lock=False)
self.try_send_command("mc 0", enable_lock=False)
result = self.update_game_result()
if result:
self.try_send_command("stop", enable_lock=False)
elif self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE, enable_lock=False)
def set_game_rule(self, rule):
with self.analysis_lock:
self.sync_board_assume_locked()
self.game_rule = rule
self.try_send_command(f"kata-set-rule scoring {rule}", enable_lock=False)
result = self.update_game_result()
if result:
self.try_send_command("stop", enable_lock=False)
elif self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE, enable_lock=False)
def set_game_drawrule(self, rule):
with self.analysis_lock:
self.sync_board_assume_locked()
self.game_drawrule = rule
self.try_send_command(f"kata-set-rule drawjudge {rule}", enable_lock=False)
result = self.update_game_result()
if result:
self.try_send_command("stop", enable_lock=False)
elif self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE, enable_lock=False)
def set_game_looprule(self, rule):
with self.analysis_lock:
self.sync_board_assume_locked()
self.game_looprule = rule
self.try_send_command(f"kata-set-rule looprule {rule}", enable_lock=False)
result = self.update_game_result()
if result:
self.try_send_command("stop", enable_lock=False)
elif self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE, enable_lock=False)
def read_stderr(self):
while True:
line = self.katago_process.stderr.readline()
if not line:
break
def read_output(self):
while True:
line = self.katago_process.stdout.readline()
if not line:
break
line = line.strip()
if line.startswith("info"):
self.handle_analysis_line(line)
else:
with self.analysis_lock:
if "illegal" in line:
print("Detect illegal move, sync with the engine")
self.sync_board_assume_locked(undo_once=True)
result = self.update_game_result()
if result:
self.try_send_command("stop", enable_lock=False)
elif self.analyzing:
self.try_send_command(GTP_COMMAND_ANALYZE, enable_lock=False)
self.gtp_log.append(('recv', line))
if len(self.gtp_log) > 100:
self.gtp_log.pop(0)
def handle_analysis_line(self, line):
if "rootInfo" in line:
root_match = re.search(r'rootInfo.*?\bvisits\s+(\d+)', line)
if root_match:
with self.analysis_lock:
self.analysis_root_visits = int(root_match.group(1))
if self.mode == "human_ai" and self.human_ai_ai_thinking:
self.human_ai_root_visits = self.analysis_root_visits
if "info" in line and "visits" in line and "winrate" in line:
pattern = re.compile(
r'info move (\w+)'
r'.*?visits (\d+)'
r'.*?winrate ([-\d.]+(?:[eE][-+]?\d+)?)'
r'.*?scoreMean ([-\d.]+(?:[eE][-+]?\d+)?)'
r'.*?lcb ([-\d.]+(?:[eE][-+]?\d+)?)'
r'.*?order (\d+)'
r'.*?pv ([\w\s]+?)(?=\s*(?:info|rootInfo|ownership|ownershipStdev|$))',
re.DOTALL
)
with self.analysis_lock:
if time.time() - self.last_analysis_time >= self.analysis_refresh_interval:
self.analysis_results.clear()
self.last_analysis_time = time.time()
for match in pattern.finditer(line):
move = match.group(1)
visits = int(match.group(2))
winrate = float(match.group(3)) * 100