Skip to content

Commit 456f17e

Browse files
author
Chris Turner
authored
Merge pull request #314 from datajoint/stage-external-storage-jobid
Stage -> External Storage
2 parents b4838e4 + e9616fb commit 456f17e

File tree

10 files changed

+187
-39
lines changed

10 files changed

+187
-39
lines changed

+dj/+internal/AutoPopulate.m

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,10 @@ function parpopulate(self, varargin)
149149
% key=null : blob # structure containing the key
150150
% error_message="" : varchar(1023) # error message returned if failed
151151
% error_stack=null : blob # error stack if failed
152+
% user="" : varchar(255) # database user
152153
% host="" : varchar(255) # system hostname
153154
% pid=0 : int unsigned # system process id
155+
% connection_id=0 : bigint unsigned # database connection id
154156
% timestamp=CURRENT_TIMESTAMP : timestamp # automatic timestamp
155157
%
156158
% A job is considered to be available when <package>.Jobs contains
@@ -390,8 +392,10 @@ function cleanup(self, key)
390392
catch
391393
[~,host] = system('hostname');
392394
end
395+
jobKey.user = self.schema.conn.user;
393396
jobKey.host = strtrim(host);
394397
jobKey.pid = feature('getpid');
398+
jobKey.connection_id = self.schema.conn.serverId;
395399
end
396400
if ismember('error_key', self.jobs.header.names)
397401
% for backward compatibility with versions prior to 2.6.3
@@ -420,8 +424,10 @@ function createJobTable(self)
420424
fprintf(f, 'key=null : blob # structure containing the key\n');
421425
fprintf(f, 'error_message="" : varchar(1023) # error message returned if failed\n');
422426
fprintf(f, 'error_stack=null : blob # error stack if failed\n');
427+
fprintf(f, 'user="" : varchar(255) # database user\n');
423428
fprintf(f, 'host="" : varchar(255) # system hostname\n');
424429
fprintf(f, 'pid=0 : int unsigned # system process id\n');
430+
fprintf(f, 'connection_id=0 : bigint unsigned # database connection id\n');
425431
fprintf(f, 'timestamp=CURRENT_TIMESTAMP : timestamp # automatic timestamp\n');
426432
fprintf(f, '%%}\n\n');
427433
fprintf(f, 'classdef Jobs < dj.Jobs\n');

+dj/Connection.m

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use_tls
88
inTransaction = false
99
connId % connection handle
10+
serverId % database connection id
1011
packages % maps database names to package names
1112
schemas % registered schema objects
1213

@@ -164,10 +165,16 @@ function reload(self)
164165
% The same connection is re-used by all DataJoint objects.
165166
if ~self.isConnected
166167
self.connId=mym(-1, 'open', self.host, self.user, self.password, self.use_tls);
168+
167169
if ~isempty(self.initQuery)
168-
self.query(self.initQuery);
170+
mym(self.connId, self.initQuery);
169171
end
172+
173+
tmp = mym(self.connId, 'SELECT CONNECTION_ID() as id');
174+
self.serverId = uint64(tmp.id);
175+
170176
end
177+
171178
v = varargin;
172179
if dj.config('queryBigint_to_double')
173180
v{end+1} = 'bigint_to_double';

+dj/conn.m

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@
112112
end
113113

114114
if nargout==0
115-
query(connObj, 'SELECT connection_id()')
115+
fprintf('database connection id: %d\n', connObj.serverId);
116116
end
117117

118118
end
119-

+dj/kill.m

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,45 +26,50 @@
2626

2727
function kill(restriction, connection, order_by)
2828

29-
if nargin < 3
30-
order_by = {};
31-
end
29+
if nargin < 3
30+
order_by = {};
31+
end
3232

33-
if nargin < 2
34-
connection = dj.conn;
35-
end
33+
if nargin < 2
34+
connection = dj.conn;
35+
end
3636

37-
qstr = 'SELECT * FROM information_schema.processlist WHERE id <> CONNECTION_ID()';
37+
qstr = 'SELECT * FROM information_schema.processlist WHERE id <> CONNECTION_ID()';
3838

39-
if nargin && ~isempty(restriction)
40-
qstr = sprintf('%s AND (%s)', qstr, restriction);
41-
end
39+
if nargin && ~isempty(restriction)
40+
qstr = sprintf('%s AND (%s)', qstr, restriction);
41+
end
4242

43-
if isempty(order_by)
44-
qstr = sprintf('%s ORDER BY id', qstr);
45-
else
46-
if iscell(order_by)
47-
qstr = sprintf('%s ORDER BY %s', qstr, strjoin(order_by, ','));
43+
if isempty(order_by)
44+
qstr = sprintf('%s ORDER BY id', qstr);
4845
else
49-
qstr = sprintf('%s ORDER BY %s', qstr, order_by);
46+
if iscell(order_by)
47+
qstr = sprintf('%s ORDER BY %s', qstr, strjoin(order_by, ','));
48+
else
49+
qstr = sprintf('%s ORDER BY %s', qstr, order_by);
50+
end
5051
end
51-
end
5252

53-
while true
54-
query(connection, qstr)
55-
id = input('process to kill (''q''-quit, ''a''-all) > ', 's');
56-
if ischar(id) && strncmpi(id, 'q', 1)
57-
break
58-
elseif ischar(id) && strncmpi(id, 'a', 1)
59-
res = query(connection, qstr);
60-
id = double(res.ID)';
61-
for i = id
62-
query(connection, 'kill {Si}', i)
53+
while true
54+
query(connection, qstr)
55+
id = input('process to kill (''q''-quit, ''a''-all) > ', 's');
56+
if ischar(id) && strncmpi(id, 'q', 1)
57+
break
58+
elseif ischar(id) && strncmpi(id, 'a', 1)
59+
res = query(connection, qstr);
60+
61+
res = cell2struct(struct2cell(res), lower(fieldnames(res)));
62+
63+
id = double(res.id)';
64+
for i = id
65+
query(connection, 'kill {Si}', i)
66+
end
67+
break
68+
end
69+
id = sscanf(id,'%d');
70+
if ~isempty(id)
71+
query(connection, 'kill {Si}', id(1))
6372
end
64-
break
65-
end
66-
id = sscanf(id,'%d');
67-
if ~isempty(id)
68-
query(connection, 'kill {Si}', id(1))
6973
end
74+
7075
end

+dj/kill_quick.m

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
% Kill database connections without prompting.
2+
% dj.kill_quick() kills MySQL server connections matching 'restriction',
3+
% returning the number of terminated connections.
4+
%
5+
% Restrictions are specified as strings and can involve any of the attributes
6+
% of information_schema.processlist: ID, USER, HOST, DB, COMMAND, TIME,
7+
% STATE, INFO.
8+
%
9+
% Examples:
10+
% dj.kill_quick('HOST LIKE "%compute%"') terminates connections from hosts
11+
% containing "compute" in their hostname.
12+
%
13+
% dj.kill_quick('TIME > 600') terminates all connections older than 10
14+
% minutes.
15+
16+
function nkill = kill_quick(restriction, connection)
17+
18+
if nargin < 2
19+
connection = dj.conn;
20+
end
21+
22+
qstr = 'SELECT * FROM information_schema.processlist WHERE id <> CONNECTION_ID()';
23+
24+
if nargin && ~isempty(restriction)
25+
qstr = sprintf('%s AND (%s)', qstr, restriction);
26+
end
27+
28+
res = query(connection, qstr);
29+
30+
res = cell2struct(struct2cell(res), lower(fieldnames(res)));
31+
32+
nkill = 0;
33+
for id = double(res.id)'
34+
query(connection, 'kill {Si}', id);
35+
nkill = nkill + 1;
36+
end
37+
end

docs-parts/intro/Releases_lang1.rst

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
3.4.0 -- December 9, 2020
2-
-------------------------
1+
3.4.0 -- December 11, 2020
2+
--------------------------
33
* Minor: Add dj.config to be compatible with dj-python and removed dj.set (#186) #188
44
* Minor: Add UUID DataJoint datatype (#180) PR #194
55
* Minor: Add file external storage (#143) PR #197
66
* Minor: Add S3 external storage (#88) PR #207
77
* Minor: Improve dependency version compatibility handling (#228) PR #285
88
* Minor: Add unique and nullable options for foreign keys (#110) PR #303
99
* Minor: Add non-interactive option for dj.new (#69) #317
10+
* Minor: Add dj.kill_quick (#251) PR #314
11+
* Minor: Log connection ID, user in jobs table (#87, #275) PR #314
1012
* Bugfix: Handle empty password (#250) PR #279, #292
1113
* Bugfix: Disable GUI password if running headless (#278) PR #280, #292
1214
* Bugfix: Add order_by option to dj.kill output (#229) PR #248, #292
@@ -43,4 +45,4 @@
4345
3.2.2 -- February 5, 2019
4446
-------------------------
4547

46-
`Previous release notes TBD`
48+
`Previous release notes TBD`

tests/Main.m

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
TestExternalFile & ...
66
TestExternalS3 & ...
77
TestFetch & ...
8+
TestPopulate & ...
89
TestProjection & ...
910
TestRelationalOperand & ...
1011
TestSchema & ...
1112
TestTls & ...
1213
TestUuid
13-
end
14+
end

tests/TestPopulate.m

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
classdef TestPopulate < Prep
2+
methods(Test)
3+
function TestPopulate_testPopulate(testCase)
4+
st = dbstack;
5+
disp(['---------------' st(1).name '---------------']);
6+
package = 'Lab';
7+
8+
dj.createSchema(package,[testCase.test_root '/test_schemas'], ...
9+
[testCase.PREFIX '_lab']);
10+
11+
lab_schema = Lab.getSchema; % we need schema's connection id
12+
sid = lab_schema.conn.serverId;
13+
14+
insert(Lab.Subject, {
15+
100, '2010-04-02';
16+
});
17+
18+
insert(Lab.Rig, struct( ...
19+
'rig_manufacturer', 'FooLab', ...
20+
'rig_model', '1.0', ...
21+
'rig_note', 'FooLab Frobnicator v1.0' ...
22+
));
23+
24+
% parallel populate of 1 record
25+
% .. (SessionAnalysis logs session ID as session_analysis data)
26+
% NOTE: need to call parpopulate 1st to ensure Jobs table
27+
% exists
28+
29+
insert(Lab.Session, struct( ...
30+
'session_id', 0, ...
31+
'subject_id', 100, ...
32+
'rig_manufacturer', 'FooLab', ...
33+
'rig_model', '1.0' ...
34+
));
35+
36+
parpopulate(Lab.SessionAnalysis);
37+
a_result = fetch(Lab.SessionAnalysis & 'session_id = 0', '*');
38+
testCase.verifyEqual(a_result.session_analysis.connection_id, sid);
39+
40+
% regular populate of 1 record
41+
% .. (SessionAnalysis logs jobs record as session_analysis data)
42+
43+
insert(Lab.Session, struct( ...
44+
'session_id', 1, ...
45+
'subject_id', 100, ...
46+
'rig_manufacturer', 'FooLab', ...
47+
'rig_model', '1.0' ...
48+
));
49+
50+
populate(Lab.SessionAnalysis);
51+
a_result = fetch(Lab.SessionAnalysis & 'session_id = 1', '*');
52+
testCase.verifyEqual(a_result.session_analysis, 1);
53+
54+
end
55+
end
56+
end

tests/test_schemas/+Lab/Session.m

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
%{
2+
# Session
3+
session_id: int
4+
---
5+
-> Lab.Subject
6+
-> Lab.Rig
7+
%}
8+
classdef Session < dj.Manual
9+
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
%{
2+
# SessionAnalysis
3+
-> Lab.Session
4+
---
5+
session_analysis: longblob
6+
%}
7+
classdef SessionAnalysis < dj.Computed
8+
methods(Access=protected)
9+
function makeTuples(self,key)
10+
11+
c = self.schema.conn;
12+
r = sprintf('connection_id = %d', c.serverId);
13+
14+
j = fetch(Lab.Jobs() & r, '*');
15+
16+
if isempty(j)
17+
key.session_analysis = key.session_id;
18+
else
19+
key.session_analysis = j;
20+
end
21+
22+
insert(self, key);
23+
24+
end
25+
end
26+
end

0 commit comments

Comments
 (0)