Skip to content

Commit 2ab5182

Browse files
ahydraxvytautask
authored andcommitted
Implemented aggregation for counter table (#66)
* Updated dependencies * Implemeted counters aggregation * Cleanup Replaced tabs by spaces, unified variable names, etc.
1 parent 327dce6 commit 2ab5182

File tree

6 files changed

+680
-635
lines changed

6 files changed

+680
-635
lines changed

src/Hangfire.PostgreSql/ExpirationManager.cs

Lines changed: 112 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -30,69 +30,72 @@
3030
namespace Hangfire.PostgreSql
3131
{
3232
#if (NETCORE1 || NETCORE50 || NETSTANDARD1_5 || NETSTANDARD1_6)
33-
public
33+
public
3434
#else
3535
internal
3636
#endif
37-
class ExpirationManager : IBackgroundProcess, IServerComponent
38-
{
39-
private static readonly TimeSpan DelayBetweenPasses = TimeSpan.FromSeconds(1);
40-
private const int NumberOfRecordsInSinglePass = 1000;
37+
class ExpirationManager : IBackgroundProcess, IServerComponent
38+
{
39+
private static readonly TimeSpan DelayBetweenPasses = TimeSpan.FromSeconds(1);
40+
private const int NumberOfRecordsInSinglePass = 1000;
4141

4242
#if (NETCORE1 || NETCORE50 || NETSTANDARD1_5 || NETSTANDARD1_6)
4343
private static readonly ILog Logger = LogProvider.GetLogger(typeof(ExpirationManager));
4444
#else
4545
private static readonly ILog Logger = LogProvider.GetCurrentClassLogger();
4646
#endif
47+
48+
private static readonly string[] ProcessedCounters =
49+
{
50+
"stats:succeeded",
51+
"stats:deleted",
52+
};
53+
4754
private static readonly string[] ProcessedTables =
48-
{
49-
"counter",
50-
"job",
51-
"list",
52-
"set",
53-
"hash",
54-
};
55-
56-
private readonly PostgreSqlStorage _storage;
57-
private readonly TimeSpan _checkInterval;
58-
private readonly PostgreSqlStorageOptions _options;
59-
60-
public ExpirationManager(PostgreSqlStorage storage, PostgreSqlStorageOptions options)
61-
: this(storage, options, TimeSpan.FromHours(1))
62-
{
63-
}
64-
65-
public ExpirationManager(PostgreSqlStorage storage, PostgreSqlStorageOptions options, TimeSpan checkInterval)
66-
{
67-
if (storage == null) throw new ArgumentNullException(nameof(storage));
68-
if (options == null) throw new ArgumentNullException(nameof(options));
69-
70-
_options = options;
71-
_storage = storage;
72-
_checkInterval = checkInterval;
73-
}
74-
75-
public void Execute(BackgroundProcessContext context)
76-
{
77-
Execute(context.CancellationToken);
78-
}
79-
80-
public void Execute(CancellationToken cancellationToken)
81-
{
82-
foreach (var table in ProcessedTables)
83-
{
84-
Logger.DebugFormat("Removing outdated records from table '{0}'...", table);
85-
86-
int removedCount = 0;
87-
88-
do
89-
{
90-
using (var storageConnection = (PostgreSqlConnection)_storage.GetConnection())
91-
{
92-
using (var transaction = storageConnection.Connection.BeginTransaction(IsolationLevel.ReadCommitted))
93-
{
94-
removedCount = storageConnection.Connection.Execute(
95-
string.Format(@"
55+
{
56+
"counter",
57+
"job",
58+
"list",
59+
"set",
60+
"hash",
61+
};
62+
63+
private readonly PostgreSqlStorage _storage;
64+
private readonly PostgreSqlStorageOptions _options;
65+
private readonly TimeSpan _checkInterval;
66+
67+
public ExpirationManager(PostgreSqlStorage storage, PostgreSqlStorageOptions options)
68+
: this(storage, options, TimeSpan.FromHours(1))
69+
{
70+
}
71+
72+
public ExpirationManager(PostgreSqlStorage storage, PostgreSqlStorageOptions options, TimeSpan checkInterval)
73+
{
74+
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
75+
_options = options ?? throw new ArgumentNullException(nameof(options));
76+
_checkInterval = checkInterval;
77+
}
78+
79+
public override string ToString() => "SQL Records Expiration Manager";
80+
81+
public void Execute(BackgroundProcessContext context) => Execute(context.CancellationToken);
82+
83+
public void Execute(CancellationToken cancellationToken)
84+
{
85+
foreach (var table in ProcessedTables)
86+
{
87+
Logger.DebugFormat("Removing outdated records from table '{0}'...", table);
88+
89+
int removedCount = 0;
90+
91+
do
92+
{
93+
using (var storageConnection = (PostgreSqlConnection)_storage.GetConnection())
94+
{
95+
using (var transaction = storageConnection.Connection.BeginTransaction(IsolationLevel.ReadCommitted))
96+
{
97+
removedCount = storageConnection.Connection.Execute(
98+
string.Format(@"
9699
DELETE FROM """ + _options.SchemaName + @""".""{0}""
97100
WHERE ""id"" IN (
98101
SELECT ""id""
@@ -101,26 +104,59 @@ DELETE FROM """ + _options.SchemaName + @""".""{0}""
101104
LIMIT {1}
102105
)", table, NumberOfRecordsInSinglePass.ToString(CultureInfo.InvariantCulture)), transaction);
103106

104-
transaction.Commit();
105-
}
106-
}
107-
108-
if (removedCount > 0)
109-
{
110-
Logger.InfoFormat("Removed {0} outdated record(s) from '{1}' table.", removedCount, table);
111-
112-
cancellationToken.WaitHandle.WaitOne(DelayBetweenPasses);
113-
cancellationToken.ThrowIfCancellationRequested();
114-
}
115-
} while (removedCount != 0);
116-
}
117-
118-
cancellationToken.WaitHandle.WaitOne(_checkInterval);
119-
}
120-
121-
public override string ToString()
122-
{
123-
return "SQL Records Expiration Manager";
124-
}
125-
}
126-
}
107+
transaction.Commit();
108+
}
109+
}
110+
111+
if (removedCount > 0)
112+
{
113+
Logger.InfoFormat("Removed {0} outdated record(s) from '{1}' table.", removedCount, table);
114+
115+
cancellationToken.WaitHandle.WaitOne(DelayBetweenPasses);
116+
cancellationToken.ThrowIfCancellationRequested();
117+
}
118+
} while (removedCount != 0);
119+
}
120+
AggregateCounters(cancellationToken);
121+
cancellationToken.WaitHandle.WaitOne(_checkInterval);
122+
}
123+
124+
private void AggregateCounters(CancellationToken cancellationToken)
125+
{
126+
foreach (var processedCounter in ProcessedCounters)
127+
{
128+
AggregateCounter(processedCounter);
129+
cancellationToken.ThrowIfCancellationRequested();
130+
}
131+
}
132+
133+
private void AggregateCounter(string counterName)
134+
{
135+
using (var connection = (PostgreSqlConnection)_storage.GetConnection())
136+
{
137+
using (var transaction = connection.Connection.BeginTransaction(IsolationLevel.ReadCommitted))
138+
{
139+
var aggregateQuery = $@"
140+
WITH counters AS (
141+
DELETE FROM ""{_options.SchemaName}"".""counter""
142+
WHERE ""key"" = '{counterName}'
143+
AND ""expireat"" IS NULL
144+
RETURNING *
145+
)
146+
147+
SELECT SUM(value) FROM counters;
148+
";
149+
150+
var aggregatedValue = connection.Connection.ExecuteScalar<long>(aggregateQuery, transaction: transaction);
151+
transaction.Commit();
152+
153+
if (aggregatedValue > 0)
154+
{
155+
var insertQuery = $@"INSERT INTO ""{_options.SchemaName}"".""counter""(""key"", ""value"") VALUES (@key, @value);";
156+
connection.Connection.Execute(insertQuery, new { key = counterName, value = aggregatedValue });
157+
}
158+
}
159+
}
160+
}
161+
}
162+
}

src/Hangfire.PostgreSql/Hangfire.PostgreSql.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525

2626
<ItemGroup>
2727
<PackageReference Include="Dapper" Version="1.50.2" />
28-
<PackageReference Include="Hangfire.Core" Version="1.6.10" />
29-
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
30-
<PackageReference Include="Npgsql" Version="3.2.1" />
28+
<PackageReference Include="Hangfire.Core" Version="1.6.12" />
29+
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
30+
<PackageReference Include="Npgsql" Version="3.2.2" />
3131
</ItemGroup>
3232

3333
<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard1.4' ">
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
SET search_path = 'hangfire';
2+
--
3+
-- Table structure for table `Schema`
4+
--
5+
6+
DO
7+
$$
8+
BEGIN
9+
IF EXISTS (SELECT 1 FROM "schema" WHERE "version"::integer >= 7) THEN
10+
RAISE EXCEPTION 'version-already-applied';
11+
END IF;
12+
END
13+
$$;
14+
15+
ALTER TABLE "counter" ALTER COLUMN value TYPE bigint;
16+
ALTER TABLE "counter" DROP COLUMN updatecount RESTRICT;

0 commit comments

Comments
 (0)