Skip to content

Commit 8064262

Browse files
authored
Merge pull request #87 from zalando-nakadi/spring-boot-2-test
Support Spring Boot 2
2 parents c2ba401 + bcebbd2 commit 8064262

File tree

22 files changed

+605
-251
lines changed

22 files changed

+605
-251
lines changed

README.md

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ The goal of this Spring Boot starter is to simplify the reliable integration bet
1616

1717
There are already [multiple clients for the Nakadi REST API](https://zalando.github.io/nakadi/manual.html#using_clients), but none of them solves the mentioned issues.
1818

19-
We solved them by persisting new events in a log table as part of the producing JDBC transaction. They will then be sent asynchonously to Nakadi after the transaction completed. If the transaction is rolled back, the events will vanish too. As a result, events will always be sent if and only if the transaction succeeded.
19+
We solved them by persisting new events in a log table as part of the producing JDBC transaction. They will then be sent asynchronously to Nakadi after the transaction completed. If the transaction is rolled back, the events will vanish too. As a result, events will always be sent if and only if the transaction succeeded.
2020

2121
The Transmitter generates a strictly monotonically increasing event id that can be used for ordering the events during retrieval. It is not guaranteed, that events will be sent to Nakadi in the order they have been produced. If an event could not be sent to Nakadi, the library will periodically retry the transmission.
2222

@@ -40,7 +40,8 @@ You may of course always setup a fresh system with the newest version.
4040

4141
## Prerequisites
4242

43-
This library was tested with Spring Boot 1.5.3.RELEASE and relies on an existing configured PostgreSQL DataSource.
43+
This library was tested with Spring Boot 2.0.3.RELEASE and relies on an existing configured PostgreSQL DataSource.
44+
**If you are still using Spring Boot 1.x, please use versions < 20.0.0, they are still actively maintained ([Documentation](https://github.com/zalando-nakadi/nakadi-producer-spring-boot-starter/tree/spring-boot-1)).**
4445

4546
This library also uses:
4647

@@ -50,14 +51,14 @@ This library also uses:
5051
* jackson >= 2.7.0
5152
* (optional) Zalando's [tracer-spring-boot-starter](https://github.com/zalando/tracer)
5253
* (optional) Zalando's [tokens library](https://github.com/zalando/tokens) >= 0.10.0
53-
* Please note that [tokens-spring-boot-starter](https://github.com/zalando-stups/spring-boot-zalando-stups-tokens) 0.10.0 comes with tokens 0.9.9, which is not enough. You can manually add tokens 0.10.0 with that starter, though.
54+
* Please note that [tokens-spring-boot-starter](https://github.com/zalando-stups/spring-boot-zalando-stups-tokens) 0.10.0 comes with tokens 0.9.9, which is not enough. You can manually add tokens 0.10.0 with that starter, though. To be used in zalando's k8s environment, you must at least use 0.11.0.
5455

5556

5657
## Usage
5758

5859
### Setup
5960

60-
If you are using maven, include the library in your `pom.xml`:
61+
If you are using Maven, include the library in your `pom.xml`:
6162

6263
```xml
6364
<dependency>
@@ -81,7 +82,7 @@ public class Application {
8182
}
8283
```
8384

84-
The library uses flyway migrations to set up its own database schema `nakadi_events`.
85+
The library uses Flyway migrations to set up its own database schema `nakadi_events`.
8586

8687
### Nakadi communication configuration
8788

@@ -211,30 +212,43 @@ process step the event is reporting.
211212
### Event snapshots (optional)
212213

213214
A Snapshot event is a special type of data change event (data operation) defined by Nakadi.
214-
It does not represent a change of the state of a resource, but a current snapshot of the state of the resource.
215+
It does not represent a change of the state of a resource, but a current snapshot of its state. It can be useful to
216+
bootstrap a new consumer or to recover from inconsistencies between sender and consumer after an incident.
215217

216218
You can create snapshot events programmatically (using EventLogWriter.fireSnapshotEvent), but usually snapshot event
217219
creation is a irregular, manually triggered maintenance task.
218220

219221
This library provides a Spring Boot Actuator endpoint named `snapshot_event_creation` that can be used to trigger a Snapshot for a given event type. Assuming your management port is set to `7979`,
220222

221-
GET localhost:7979/snapshot_event_creation
223+
GET localhost:7979/actuator/snapshot-event-creation
222224

223225
will return a list of all event types available for snapshot creation and
224226

225-
POST localhost:7979/snapshot_event_creation/my.event-type
227+
POST localhost:7979/actuator/snapshot-event-creation/my.event-type
226228

227-
will trigger a snapshot for the event type `my.event-type`. The (optional) request body is a "filter specifier".
229+
will trigger a snapshot for the event type `my.event-type`. You can change the port, the authentication scheme and the
230+
path prefix as part of your Spring Boot Actuator configuration.
228231

229-
This will only work if your application has configured spring-boot-actuator
232+
You can provide an optional filter specifier that will be passed to your application to implement any application
233+
specific event/entity filtering logic. It can be provided either as a query parameter called `filter`, or as a
234+
request body
235+
236+
{"filter":"myFilter"}
237+
238+
This endpoint will only work if your application includes spring-boot-actuator,
230239
```xml
231240
<dependency>
232241
<groupId>org.springframework.boot</groupId>
233242
<artifactId>spring-boot-starter-actuator</artifactId>
234243
</dependency>
235244
```
236-
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface. Otherwise (or if the generator is not for the event type you requested), the library will respond with an error message when you request a snapshot creation.
237-
The request body (the "filter specifier") of the trigger request will be passed as a string parameter to the SnapshotEventGenerator's `generateSnapshots` method.
245+
your `application.properties` includes
246+
```
247+
management.endpoints.web.exposure.include=snapshot-event-creation,your-other-endpoints,...`
248+
```
249+
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface.
250+
The optional filter specifier of the trigger request will be passed as a string parameter to the
251+
SnapshotEventGenerator's `generateSnapshots` method and may be null, if none is given.
238252
239253
We provide a `SimpleSnapshotEventGenerator` to ease bean creation using a more functional style:
240254
```java
@@ -269,13 +283,13 @@ tracer:
269283
By default, the library will pick up your flyway data source (or the primary data source if no flyway data source is
270284
configured), create its own schema and start setting up its tables in there. You can customize this process in two ways:
271285
272-
If you want to use a different data source for schema maintainence (for example to use a different username) and
273-
configuring the Spring flyway datasource is not enough, your can define a spring bean of type `DataSource` and annotate
286+
If you want to use a different data source for schema maintenance (for example to use a different username) and
287+
configuring the Spring Flyway datasource is not enough, your can define a spring bean of type `DataSource` and annotate
274288
it with `@NakadiProducerDataSource`.
275289

276-
You may also define a spring bean of type `FlywayCallback` and annotate it with `@NakadiProducerFlywayCallback`. The
277-
interface provide several hook into the schema management lifecycle that may, for example, be used to
278-
`SET ROLE migrator` before and `RESET ROLE` after each migration.
290+
You may also define a spring bean of type `NakadiProducerFlywayCallback`. The interface provides several hooks into the
291+
schema management lifecycle that may, for example, be used to `SET ROLE migrator` before and `RESET ROLE` after each
292+
migration.
279293

280294
### Test support
281295

nakadi-producer-spring-boot-starter/pom.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<parent>
1111
<groupId>org.zalando</groupId>
1212
<artifactId>nakadi-producer-reactor</artifactId>
13-
<version>4.2.0</version>
13+
<version>20.0.0-SNAPSHOT</version>
1414
</parent>
1515

1616
<artifactId>nakadi-producer-spring-boot-starter</artifactId>
@@ -102,6 +102,14 @@
102102
<artifactId>spring-boot-starter-web</artifactId>
103103
<scope>test</scope>
104104
</dependency>
105+
<dependency>
106+
<groupId>org.springframework.boot</groupId>
107+
<artifactId>spring-boot-autoconfigure</artifactId>
108+
</dependency>
109+
<dependency>
110+
<groupId>org.springframework.boot</groupId>
111+
<artifactId>spring-boot-actuator-autoconfigure</artifactId>
112+
</dependency>
105113
</dependencies>
106114

107115

nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/FlywayMigrator.java

Lines changed: 119 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,18 @@
44
import javax.sql.DataSource;
55

66
import org.flywaydb.core.Flyway;
7+
import org.flywaydb.core.api.MigrationInfo;
8+
import org.flywaydb.core.api.callback.BaseFlywayCallback;
79
import org.flywaydb.core.api.callback.FlywayCallback;
10+
import org.flywaydb.core.api.configuration.ConfigurationAware;
11+
import org.flywaydb.core.api.configuration.FlywayConfiguration;
812
import org.springframework.beans.factory.annotation.Autowired;
913
import org.springframework.boot.autoconfigure.flyway.FlywayDataSource;
1014
import org.springframework.boot.autoconfigure.flyway.FlywayProperties;
1115

16+
import java.sql.Connection;
17+
import java.util.List;
18+
1219
public class FlywayMigrator {
1320
@Autowired(required = false)
1421
@NakadiProducerFlywayDataSource
@@ -22,10 +29,9 @@ public class FlywayMigrator {
2229
private DataSource dataSource;
2330

2431
@Autowired(required = false)
25-
@NakadiProducerFlywayCallback
26-
private FlywayCallback callback;
32+
private List<NakadiProducerFlywayCallback> callbacks;
2733

28-
@Autowired
34+
@Autowired(required = false)
2935
private FlywayProperties flywayProperties;
3036

3137
@PostConstruct
@@ -34,7 +40,7 @@ public void migrateFlyway() {
3440

3541
if (this.nakadiProducerFlywayDataSource != null) {
3642
flyway.setDataSource(nakadiProducerFlywayDataSource);
37-
} else if (this.flywayProperties.isCreateDataSource()) {
43+
} else if (this.flywayProperties != null && this.flywayProperties.isCreateDataSource()) {
3844
flyway.setDataSource(this.flywayProperties.getUrl(), this.flywayProperties.getUser(),
3945
this.flywayProperties.getPassword(),
4046
this.flywayProperties.getInitSqls().toArray(new String[0]));
@@ -46,11 +52,118 @@ public void migrateFlyway() {
4652

4753
flyway.setLocations("classpath:db_nakadiproducer/migrations");
4854
flyway.setSchemas("nakadi_events");
49-
if (callback != null) {
50-
flyway.setCallbacks(callback);
55+
if (callbacks != null) {
56+
flyway.setCallbacks(callbacks.stream().map(FlywayCallbackAdapter::new).toArray(FlywayCallback[]::new));
5157
}
58+
5259
flyway.setBaselineOnMigrate(true);
5360
flyway.setBaselineVersionAsString("2133546886.1.0");
5461
flyway.migrate();
5562
}
63+
64+
private static class FlywayCallbackAdapter extends BaseFlywayCallback {
65+
66+
private NakadiProducerFlywayCallback callback;
67+
68+
private FlywayCallbackAdapter(NakadiProducerFlywayCallback callback) {
69+
this.callback = callback;
70+
}
71+
72+
@Override
73+
public void setFlywayConfiguration(FlywayConfiguration flywayConfiguration) {
74+
if (callback instanceof ConfigurationAware) {
75+
((ConfigurationAware) callback).setFlywayConfiguration(flywayConfiguration);
76+
}
77+
}
78+
79+
@Override
80+
public void beforeClean(Connection connection) {
81+
callback.beforeClean(connection);
82+
}
83+
84+
@Override
85+
public void afterClean(Connection connection) {
86+
callback.afterClean(connection);
87+
}
88+
89+
@Override
90+
public void beforeMigrate(Connection connection) {
91+
callback.beforeMigrate(connection);
92+
}
93+
94+
@Override
95+
public void afterMigrate(Connection connection) {
96+
callback.afterMigrate(connection);
97+
}
98+
99+
@Override
100+
public void beforeEachMigrate(Connection connection, MigrationInfo info) {
101+
callback.beforeEachMigrate(connection, info);
102+
}
103+
104+
@Override
105+
public void afterEachMigrate(Connection connection, MigrationInfo info) {
106+
callback.afterEachMigrate(connection, info);
107+
}
108+
109+
@Override
110+
public void beforeUndo(Connection connection) {
111+
callback.beforeUndo(connection);
112+
}
113+
114+
@Override
115+
public void beforeEachUndo(Connection connection, MigrationInfo info) {
116+
callback.beforeEachUndo(connection, info);
117+
}
118+
119+
@Override
120+
public void afterEachUndo(Connection connection, MigrationInfo info) {
121+
callback.afterEachUndo(connection, info);
122+
}
123+
124+
@Override
125+
public void afterUndo(Connection connection) {
126+
callback.afterUndo(connection);
127+
}
128+
129+
@Override
130+
public void beforeValidate(Connection connection) {
131+
callback.beforeValidate(connection);
132+
}
133+
134+
@Override
135+
public void afterValidate(Connection connection) {
136+
callback.afterValidate(connection);
137+
}
138+
139+
@Override
140+
public void beforeBaseline(Connection connection) {
141+
callback.beforeBaseline(connection);
142+
}
143+
144+
@Override
145+
public void afterBaseline(Connection connection) {
146+
callback.afterBaseline(connection);
147+
}
148+
149+
@Override
150+
public void beforeRepair(Connection connection) {
151+
callback.beforeRepair(connection);
152+
}
153+
154+
@Override
155+
public void afterRepair(Connection connection) {
156+
callback.afterRepair(connection);
157+
}
158+
159+
@Override
160+
public void beforeInfo(Connection connection) {
161+
callback.beforeInfo(connection);
162+
}
163+
164+
@Override
165+
public void afterInfo(Connection connection) {
166+
callback.afterInfo(connection);
167+
}
168+
}
56169
}

0 commit comments

Comments
 (0)