Skip to content

Commit e8ef7f8

Browse files
npawlenkoRakambda
andauthored
Add PostgreSql database support (#1159)
## Pull Request Etiquette ### Checklist - [x] Tests have been added in relevant areas - [x] Corresponding changes made to the documentation (README.adoc) ### Type of change Internal change ## Description Added postgresql database support --------- Co-authored-by: Thomas Couchoud <1688389+Rakambda@users.noreply.github.com>
1 parent a744ea4 commit e8ef7f8

11 files changed

Lines changed: 476 additions & 105 deletions

File tree

gradle/libs.versions.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ hikari-cp-version = "7.0.2"
2121
mariadb-version = "3.5.6"
2222
sqlite-version = "3.50.3.0"
2323
mysql-version = "9.4.0"
24+
postgresql-version = "42.7.8"
2425
rerunner-jupiter-version = "2.1.6"
2526
flyway-version = "11.14.0"
2627
selenide-version = "7.11.1"
@@ -64,8 +65,10 @@ hikaricp = { group = "com.zaxxer", name = "HikariCP", version.ref = "hikari-cp-v
6465
mariadb = { group = "org.mariadb.jdbc", name = "mariadb-java-client", version.ref = "mariadb-version" }
6566
sqlite = { group = "org.xerial", name = "sqlite-jdbc", version.ref = "sqlite-version" }
6667
mysql = { group = "com.mysql", name = "mysql-connector-j", version.ref = "mysql-version" }
68+
postgresql = { group = "org.postgresql", name = "postgresql", version.ref = "postgresql-version" }
6769
flyway-core = { group = "org.flywaydb", name = "flyway-core", version.ref = "flyway-version" }
6870
flyway-mysql = { group = "org.flywaydb", name = "flyway-mysql", version.ref = "flyway-version" }
71+
flyway-postgresql = { group = "org.flywaydb", name = "flyway-database-postgresql", version.ref = "flyway-version"}
6972
selenide = { group = "com.codeborne", name = "selenide", version.ref = "selenide-version" }
7073
selenide-proxy = { group = "com.codeborne", name = "selenide-proxy", version.ref = "selenide-version" }
7174
lombok = { group = "org.projectlombok", name = "lombok", version.ref = "lombok-version" }
@@ -88,7 +91,7 @@ rerunnerJupiter = { group = "io.github.artsok", name = "rerunner-jupiter", versi
8891
jackson = ["jackson-core", "jackson-annotations", "jackson-databind", "jackson-jsr310"]
8992
log4j2 = ["log4j2-core", "log4j2-slf4j", "log4j2-json"]
9093
unirest = ["unirest-java", "unirest-modules-jackson"]
91-
flyway = ["flyway-core", "flyway-mysql"]
94+
flyway = ["flyway-core", "flyway-mysql", "flyway-postgresql"]
9295
selenide = ["selenide", "selenide-proxy"]
9396
jsonschemaGenerator = ["jsonschema-generator", "jsonschema-module-jackson"]
9497
junit = ["junit-api", "junit-params", "junit-engine"]

miner/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies {
3232
implementation(libs.mariadb)
3333
implementation(libs.sqlite)
3434
implementation(libs.mysql)
35+
implementation(libs.postgresql)
3536
implementation(libs.bundles.flyway)
3637
implementation(libs.jSpecify)
3738

miner/docs/modules/ROOT/pages/configuration/global.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,5 @@ You'll therefore have to adjust the settings for each mined account to not point
6060
link:https://www.baeldung.com/java-jdbc-url-format[JDBC url]:
6161

6262
* MariaDB: `jdbc:mariadb://host:port/database` (great if you have a DB available or running inside docker as you can set a mariadb container)
63-
* SQLite: `jdbc:sqlite:/path/to/file` (great running locally and want to store it to a file, however less resilient and more prone to corruption).
63+
* PostgreSQL: `jdbc:postgresql://host:port/database` (just like MariaDB, reliable and efficient for both local and Docker setups)
64+
* SQLite: `jdbc:sqlite:/path/to/file` (great running locally and want to store it to a file, however less resilient and more prone to corruption).

miner/src/main/java/fr/rakambda/channelpointsminer/miner/database/MariaDBDatabase.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected void resolveUserPredictions(double returnRatioForWin, @NonNull String
5555
SET
5656
`PredictionCnt`=`PredictionCnt`+1,
5757
`WinCnt`=`WinCnt`+?,
58-
`WinRate`=`WinCnt`/`PredictionCnt`,
58+
`WinRate`=(`WinCnt` + ?)/`PredictionCnt`,
5959
`ReturnOnInvestment`=`ReturnOnInvestment`+?
6060
WHERE `ID`=? AND `ChannelID`=?""")
6161
){
@@ -65,20 +65,17 @@ protected void resolveUserPredictions(double returnRatioForWin, @NonNull String
6565
try(var result = getOpenPredictionStmt.executeQuery()){
6666
while(result.next()){
6767
var userPrediction = Converters.convertUserPrediction(result);
68-
if(badge.equals(userPrediction.getBadge())){
69-
updatePredictionUserStmt.setInt(1, 1);
70-
updatePredictionUserStmt.setDouble(2, returnOnInvestment);
71-
}
72-
else{
73-
updatePredictionUserStmt.setInt(1, 0);
74-
updatePredictionUserStmt.setDouble(2, -1);
75-
}
76-
updatePredictionUserStmt.setInt(3, userPrediction.getUserId());
77-
updatePredictionUserStmt.setString(4, userPrediction.getChannelId());
68+
boolean isWinner = badge.equals(userPrediction.getBadge());
69+
70+
updatePredictionUserStmt.setInt(1, isWinner ? 1 : 0);
71+
updatePredictionUserStmt.setInt(2, isWinner ? 1 : 0);
72+
updatePredictionUserStmt.setDouble(3, isWinner ? returnOnInvestment : -1);
73+
updatePredictionUserStmt.setInt(4, userPrediction.getUserId());
74+
updatePredictionUserStmt.setString(5, userPrediction.getChannelId());
7875
updatePredictionUserStmt.addBatch();
7976
}
8077
updatePredictionUserStmt.executeBatch();
8178
}
8279
}
8380
}
84-
}
81+
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/database/MysqlDatabase.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
import javax.sql.DataSource;
44

55
public class MysqlDatabase extends MariaDBDatabase{
6-
public MysqlDatabase(DataSource dataSource){
7-
super(dataSource);
8-
}
9-
10-
@Override
11-
public void initDatabase(){
12-
applyFlyway("db/migrations/mysql");
13-
}
6+
public MysqlDatabase(DataSource dataSource){
7+
super(dataSource);
8+
}
9+
10+
@Override
11+
public void initDatabase(){
12+
applyFlyway("db/migrations/mysql");
13+
}
1414
}
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package fr.rakambda.channelpointsminer.miner.database;
2+
3+
import fr.rakambda.channelpointsminer.miner.api.pubsub.data.message.subtype.Event;
4+
import fr.rakambda.channelpointsminer.miner.database.converter.Converters;
5+
import fr.rakambda.channelpointsminer.miner.database.model.prediction.OutcomeStatistic;
6+
import fr.rakambda.channelpointsminer.miner.factory.TimeFactory;
7+
import lombok.extern.log4j.Log4j2;
8+
import org.jspecify.annotations.NonNull;
9+
import org.jspecify.annotations.Nullable;
10+
import javax.sql.DataSource;
11+
import java.sql.Connection;
12+
import java.sql.PreparedStatement;
13+
import java.sql.SQLException;
14+
import java.sql.Timestamp;
15+
import java.time.Instant;
16+
import java.time.ZonedDateTime;
17+
import java.util.Collection;
18+
import java.util.LinkedList;
19+
import java.util.Optional;
20+
21+
@Log4j2
22+
public class PostgreSqlDatabase extends BaseDatabase{
23+
public PostgreSqlDatabase(DataSource dataSource){
24+
super(dataSource);
25+
}
26+
27+
@Override
28+
public void initDatabase(){
29+
applyFlyway("db/migrations/postgresql");
30+
}
31+
32+
@Override
33+
protected void addUserPrediction(@NonNull String channelId, int userId, @NonNull String badge) throws SQLException{
34+
try(var conn = getConnection();
35+
var statement = conn.prepareStatement("""
36+
INSERT INTO UserPrediction(ChannelID, UserID, Badge)
37+
VALUES (?, ?, ?)
38+
ON CONFLICT (ChannelID, UserID) DO NOTHING;
39+
""")){
40+
41+
statement.setString(1, channelId);
42+
statement.setInt(2, userId);
43+
statement.setString(3, badge);
44+
statement.executeUpdate();
45+
}
46+
}
47+
48+
@Override
49+
protected void resolveUserPredictions(double returnRatioForWin, @NonNull String channelId, @NonNull String badge) throws SQLException{
50+
try(var conn = getConnection();
51+
var getOpenPredictionStmt = conn.prepareStatement("""
52+
SELECT UserID, ChannelID, Badge
53+
FROM UserPrediction
54+
WHERE ChannelID = ?;
55+
""");
56+
var updatePredictionUserStmt = conn.prepareStatement("""
57+
UPDATE PredictionUser
58+
SET
59+
PredictionCnt = PredictionCnt + 1,
60+
WinCnt = WinCnt + ?,
61+
WinRate = (WinCnt + ?)::FLOAT / (PredictionCnt + 1),
62+
ReturnOnInvestment = ReturnOnInvestment + ?
63+
WHERE ID = ? AND ChannelID = ?;
64+
""")){
65+
66+
double returnOnInvestment = returnRatioForWin - 1;
67+
68+
getOpenPredictionStmt.setString(1, channelId);
69+
try(var result = getOpenPredictionStmt.executeQuery()){
70+
while(result.next()){
71+
var userPrediction = Converters.convertUserPrediction(result);
72+
73+
boolean isWinner = badge.equals(userPrediction.getBadge());
74+
updatePredictionUserStmt.setInt(1, isWinner ? 1 : 0);
75+
updatePredictionUserStmt.setInt(2, isWinner ? 1 : 0);
76+
updatePredictionUserStmt.setDouble(3, isWinner ? returnOnInvestment : -1);
77+
updatePredictionUserStmt.setInt(4, userPrediction.getUserId());
78+
updatePredictionUserStmt.setString(5, userPrediction.getChannelId());
79+
updatePredictionUserStmt.addBatch();
80+
}
81+
updatePredictionUserStmt.executeBatch();
82+
}
83+
}
84+
}
85+
86+
@Override
87+
public void createChannel(@NonNull String channelId, @NonNull String username) throws SQLException{
88+
try(var conn = getConnection();
89+
var statement = conn.prepareStatement("""
90+
INSERT INTO Channel(ID, Username, LastStatusChange)
91+
VALUES (?, ?, NOW())
92+
ON CONFLICT (ID) DO NOTHING;
93+
""")){
94+
95+
statement.setString(1, channelId);
96+
statement.setString(2, username);
97+
statement.executeUpdate();
98+
}
99+
}
100+
101+
@Override
102+
public void deleteAllUserPredictions() throws SQLException{
103+
log.debug("Removing all user predictions.");
104+
try(var conn = getConnection();
105+
var statement = conn.prepareStatement("DELETE FROM UserPrediction")){
106+
statement.executeUpdate();
107+
}
108+
}
109+
110+
@Override
111+
public void deleteUserPredictionsForChannel(@NonNull String channelId) throws SQLException{
112+
log.debug("Removing user predictions for channelId '{}'.", channelId);
113+
try(var conn = getConnection();
114+
var statement = getDeleteUserPredictionsForChannelStmt(conn)){
115+
statement.setString(1, channelId);
116+
117+
statement.executeUpdate();
118+
}
119+
}
120+
121+
@NonNull
122+
private PreparedStatement getDeleteUserPredictionsForChannelStmt(@NonNull Connection conn) throws SQLException{
123+
return conn.prepareStatement("""
124+
DELETE FROM UserPrediction
125+
WHERE ChannelID=?;"""
126+
);
127+
}
128+
129+
@Override
130+
public @NonNull Optional<String> getStreamerIdFromName(@NonNull String channelName) throws SQLException{
131+
try(var conn = getConnection();
132+
var statement = conn.prepareStatement("SELECT ID FROM Channel WHERE Username = ?;")){
133+
statement.setString(1, channelName);
134+
try(var result = statement.executeQuery()){
135+
if(result.next()){
136+
return Optional.ofNullable(result.getString("ID"));
137+
}
138+
return Optional.empty();
139+
}
140+
}
141+
}
142+
143+
@Override
144+
public @NonNull Collection<OutcomeStatistic> getOutcomeStatisticsForChannel(@NonNull String channelId, int minBetsPlacedByUser) throws SQLException{
145+
try(var conn = getConnection();
146+
var statement = conn.prepareStatement("""
147+
SELECT up.Badge,
148+
COUNT(up.UserID) AS UserCnt,
149+
AVG(pu.WinRate) AS "AvgWinRate",
150+
AVG(pu.PredictionCnt) AS AvgUserBetsPlaced,
151+
AVG(pu.WinCnt) AS AvgUserWins,
152+
AVG(pu.ReturnOnInvestment) AS AvgReturnOnInvestment
153+
FROM UserPrediction up
154+
INNER JOIN PredictionUser pu
155+
ON up.UserID = pu.ID AND up.ChannelID = pu.ChannelID
156+
WHERE up.ChannelID = ?
157+
AND pu.PredictionCnt >= ?
158+
GROUP BY up.Badge;
159+
""")){
160+
statement.setString(1, channelId);
161+
statement.setInt(2, minBetsPlacedByUser);
162+
163+
var results = new LinkedList<OutcomeStatistic>();
164+
try(var rs = statement.executeQuery()){
165+
while(rs.next()){
166+
results.add(Converters.convertOutcomeTrust(rs));
167+
}
168+
}
169+
return results;
170+
}
171+
}
172+
173+
@Override
174+
public void resolvePrediction(@NonNull Event event, @NonNull String outcome, @NonNull String badge, double returnRatioForWin) throws SQLException{
175+
var ended = Optional.ofNullable(event.getEndedAt()).map(ZonedDateTime::toInstant).orElseGet(TimeFactory::now);
176+
177+
resolveUserPredictions(returnRatioForWin, event.getChannelId(), badge);
178+
179+
try(var conn = getConnection();
180+
var statement = conn.prepareStatement("""
181+
INSERT INTO ResolvedPrediction(EventID, ChannelID,Title,EventCreated, EventEnded, Canceled,Outcome, Badge, ReturnRatioForWin)
182+
VALUES (?, ?, ?, ?, ?, false, ?, ?, ?)
183+
ON CONFLICT (EventID) DO NOTHING;
184+
""")){
185+
186+
statement.setString(1, event.getId());
187+
statement.setString(2, event.getChannelId());
188+
statement.setString(3, event.getTitle());
189+
statement.setTimestamp(4, Timestamp.from(event.getCreatedAt().toInstant()));
190+
statement.setTimestamp(5, Timestamp.from(ended));
191+
statement.setString(6, outcome);
192+
statement.setString(7, badge);
193+
statement.setDouble(8, returnRatioForWin);
194+
statement.executeUpdate();
195+
}
196+
197+
deleteUserPredictionsForChannel(event.getChannelId());
198+
}
199+
200+
@Override
201+
public void cancelPrediction(@NonNull Event event) throws SQLException{
202+
var ended = Optional.ofNullable(event.getEndedAt()).map(ZonedDateTime::toInstant).orElseGet(TimeFactory::now);
203+
204+
try(var conn = getConnection();
205+
var statement = conn.prepareStatement("""
206+
INSERT INTO ResolvedPrediction(EventID, ChannelID, Title, EventCreated, EventEnded, Canceled)
207+
VALUES (?, ?, ?, ?, ?, true)
208+
ON CONFLICT (EventID) DO NOTHING;
209+
""")){
210+
statement.setString(1, event.getId());
211+
statement.setString(2, event.getChannelId());
212+
statement.setString(3, event.getTitle());
213+
statement.setTimestamp(4, Timestamp.from(event.getCreatedAt().toInstant()));
214+
statement.setTimestamp(5, Timestamp.from(ended));
215+
statement.executeUpdate();
216+
}
217+
218+
deleteUserPredictionsForChannel(event.getChannelId());
219+
}
220+
221+
@Override
222+
public void addPrediction(@NonNull String channelId, @NonNull String eventId, @NonNull String type, @NonNull String description, @NonNull Instant instant) throws SQLException{
223+
try(var conn = getConnection();
224+
var statement = conn.prepareStatement("""
225+
INSERT INTO Prediction(ChannelID, EventID, EventDate, Type, Description)
226+
VALUES (?, ?, ?, ?, ?);
227+
""")){
228+
statement.setString(1, channelId);
229+
statement.setString(2, eventId);
230+
statement.setTimestamp(3, Timestamp.from(instant));
231+
statement.setString(4, type);
232+
statement.setString(5, description);
233+
statement.executeUpdate();
234+
}
235+
}
236+
237+
@Override
238+
public void addBalance(@NonNull String channelId, int balance, @Nullable String reason, @NonNull Instant instant) throws SQLException{
239+
try(var conn = getConnection();
240+
var statement = conn.prepareStatement("""
241+
INSERT INTO Balance(ChannelID, BalanceDate, Balance, Reason)
242+
VALUES (?, ?, ?, ?);
243+
""")){
244+
statement.setString(1, channelId);
245+
statement.setTimestamp(2, Timestamp.from(instant));
246+
statement.setInt(3, balance);
247+
statement.setString(4, reason);
248+
statement.executeUpdate();
249+
}
250+
}
251+
252+
@Override
253+
public void updateChannelStatusTime(@NonNull String channelId, @NonNull Instant instant) throws SQLException{
254+
try(var conn = getConnection();
255+
var statement = conn.prepareStatement("""
256+
UPDATE Channel
257+
SET LastStatusChange = ?
258+
WHERE ID = ?;
259+
""")){
260+
statement.setTimestamp(1, Timestamp.from(instant));
261+
statement.setString(2, channelId);
262+
statement.executeUpdate();
263+
}
264+
}
265+
}

miner/src/main/java/fr/rakambda/channelpointsminer/miner/database/SQLiteDatabase.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,10 @@ WITH wi AS (SELECT ? AS n)
7171
try(var result = getOpenPredictionStmt.executeQuery()){
7272
while(result.next()){
7373
var userPrediction = Converters.convertUserPrediction(result);
74-
if(badge.equals(userPrediction.getBadge())){
75-
updatePredictionUserStmt.setInt(1, 1);
76-
updatePredictionUserStmt.setDouble(2, returnOnInvestment);
77-
}
78-
else{
79-
updatePredictionUserStmt.setInt(1, 0);
80-
updatePredictionUserStmt.setDouble(2, -1);
81-
}
74+
boolean isWinner = badge.equals(userPrediction.getBadge());
75+
76+
updatePredictionUserStmt.setInt(1, isWinner ? 1 : 0);
77+
updatePredictionUserStmt.setDouble(2, isWinner ? returnOnInvestment : -1);
8278
updatePredictionUserStmt.setInt(3, userPrediction.getUserId());
8379
updatePredictionUserStmt.setString(4, userPrediction.getChannelId());
8480
updatePredictionUserStmt.addBatch();

0 commit comments

Comments
 (0)