Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a6a925f
feat(distribution): version 0.0.1
perctrix Apr 20, 2025
5862ec2
fix: add quotes around key-prefix in Redis configuration
perctrix Apr 20, 2025
83980da
edited version number
perctrix Apr 21, 2025
68e0278
feat: Implement Outbox pattern for reliable messaging
perctrix Apr 21, 2025
439910b
fix: Replace @PostConstruct with @EventListener for application initi…
perctrix Apr 21, 2025
a0df18f
fix: Update RedisStreamManager to use getTotalPendingMessages for pen…
perctrix Apr 21, 2025
46af207
fix: Update RedisStreamManager to use size() for pending message coun…
perctrix Apr 21, 2025
8a180d8
Merge branch 'halo-dev:main' into main
perctrix Apr 21, 2025
af18206
feat: 添加分布式健康检查功能,集成Redis通信监控并配置健康指示器
perctrix Apr 21, 2025
7b92972
Merge branch 'halo-dev:main' into main
perctrix Apr 22, 2025
d1dca2f
Remove package management files and dependencies
perctrix Apr 22, 2025
6772ce1
Merge branch 'main' of https://github.com/amplimit/halo
perctrix Apr 22, 2025
28468cf
Remove pnpm package files and documentation
perctrix Apr 22, 2025
b353f0e
Merge branch 'main' into main
perctrix Apr 22, 2025
2ccd905
添加应用程序配置文件和数据库模式以支持 Outbox 模式
perctrix Apr 23, 2025
adb56af
Merge branch 'halo-dev:main' into main
perctrix Apr 25, 2025
784ce9d
Add PostgreSQL and Redis config for distributed deployment
perctrix Apr 25, 2025
cb76439
Merge branch 'halo-dev:main' into main
perctrix Apr 25, 2025
5b7da46
Add initial application.yaml with server and distributed deployment c…
perctrix Apr 25, 2025
2cc672d
添加对平台应用程序的注解处理器依赖
perctrix Apr 25, 2025
9b16801
Merge branch 'main' into main
perctrix Apr 25, 2025
8acb0be
Merge branch 'halo-dev:main' into main
perctrix Apr 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
server:
port: 8090
spring:
r2dbc:
url: r2dbc:pool:postgresql://170.106.99.35:5432/halo-test
username: admin
password: 123456
sql:
init:
mode: always
platform: postgresql
schema-locations:
- classpath:schema-postgresql.sql
- classpath:schema-postgresql-outbox.sql
# 使用正确的 Redis 配置路径和格式
data:
redis:
host: localhost
port: 6379
password: 123456
timeout: 30000
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: -1ms
# 添加客户端名称
client-name: halo-client
cache:
type: redis
redis:
time-to-live: 3600000
key-prefix: "halo:"
halo:
work-dir: ${user.home}/.halo2
external-url: http://localhost:8090
attachment:
resource-mappings:
- pathPattern: /upload/**
locations:
- migrate-from-1.x
distributed:
enabled: true
stream-key: halo:distributed:stream
consumer-group: halo-consumer-group
scheduler-lock-prefix: "halo:scheduler-lock:"
listener-interval: 1000
outbox-processor-interval: 5000
7 changes: 6 additions & 1 deletion application/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ tasks.named('jar') {

dependencies {
implementation project(':api')
// Redis for distributed messaging and caching
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
// ShedLock for distributed scheduling
implementation 'net.javacrumbs.shedlock:shedlock-spring:5.12.0'
implementation 'net.javacrumbs.shedlock:shedlock-provider-redis-spring:5.12.0'

annotationProcessor platform(project(':platform:application'))

annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
annotationProcessor "org.springframework:spring-context-indexer"

Expand Down
2 changes: 2 additions & 0 deletions application/src/main/java/run/halo/app/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.cache.annotation.EnableCaching;
import run.halo.app.infra.properties.HaloProperties;

/**
Expand All @@ -17,6 +18,7 @@
* @date 2017-11-14
*/
@EnableScheduling
@EnableCaching
@SpringBootApplication(scanBasePackages = "run.halo.app", exclude =
IntegrationAutoConfiguration.class)
@EnableConfigurationProperties({HaloProperties.class})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.security.Principal;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,6 +27,7 @@
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.Ref;
import run.halo.app.extension.router.selector.FieldSelector;
import run.halo.app.infra.messaging.RedisStreamEventPublisher;

/**
* Abstract Service for {@link Snapshot}.
Expand All @@ -34,10 +36,37 @@
* @since 2.0.0
*/
@Slf4j
@AllArgsConstructor
public abstract class AbstractContentService {

protected final RedisStreamEventPublisher eventPublisher;

private final ReactiveExtensionClient client;

public AbstractContentService(ReactiveExtensionClient client) {
this.client = client;
this.eventPublisher = null;
}

public AbstractContentService(ReactiveExtensionClient client, RedisStreamEventPublisher eventPublisher) {
this.client = client;
this.eventPublisher = eventPublisher;
}

/**
* Publish snapshot change event to notify other instances.
*
* @param snapshot The snapshot that was modified
* @param action The action performed (create, update, delete)
*/
protected void publishSnapshotEvent(Snapshot snapshot, String action) {
if (eventPublisher != null && snapshot != null) {
eventPublisher.publish(Map.of(
"entity", "snapshot",
"id", snapshot.getMetadata().getName(),
"action", action
));
}
}

public Mono<ContentWrapper> getContent(String snapshotName, String baseSnapshotName) {
if (StringUtils.isBlank(snapshotName) || StringUtils.isBlank(baseSnapshotName)) {
Expand Down Expand Up @@ -105,7 +134,8 @@ private Mono<Snapshot> create(@Nullable String baseSnapshotName,
})
.thenReturn(source)
)
.flatMap(client::create);
.flatMap(client::create)
.doOnSuccess(snap -> publishSnapshotEvent(snap, "create"));
}

protected Mono<ContentWrapper> updateContent(String baseSnapshotName,
Expand Down Expand Up @@ -133,6 +163,7 @@ protected Mono<ContentWrapper> updateContent(String baseSnapshotName,
.thenReturn(headSnapshot)
)
.flatMap(client::update)
.doOnSuccess(snapshot -> publishSnapshotEvent(snapshot, "update"))
)
.retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(throwable -> throwable instanceof OptimisticLockingFailureException))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import run.halo.app.core.extension.content.Post;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.Ref;
import run.halo.app.infra.messaging.RedisStreamEventPublisher;

import java.util.Map;

/**
* Provides ability to get post content for the specified post.
Expand All @@ -16,10 +19,12 @@
@Component
public class PostContentServiceImpl extends AbstractContentService implements PostContentService {
private final ReactiveExtensionClient client;
private final RedisStreamEventPublisher eventPublisher;

public PostContentServiceImpl(ReactiveExtensionClient client) {
super(client);
public PostContentServiceImpl(ReactiveExtensionClient client, RedisStreamEventPublisher eventPublisher) {
super(client, eventPublisher);
this.client = client;
this.eventPublisher = eventPublisher;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import run.halo.app.infra.SystemConfigurableEnvironmentFetcher;
import run.halo.app.infra.exception.AccessDeniedException;
import run.halo.app.plugin.extensionpoint.ExtensionGetter;
import run.halo.app.infra.messaging.RedisStreamEventPublisher;
import java.util.Map;

/**
* Comment service implementation.
Expand All @@ -43,13 +45,15 @@ public class CommentServiceImpl extends AbstractCommentService implements Commen

private final ExtensionGetter extensionGetter;
private final SystemConfigurableEnvironmentFetcher environmentFetcher;
private final RedisStreamEventPublisher eventPublisher;

public CommentServiceImpl(RoleService roleService, ReactiveExtensionClient client,
UserService userService, CounterService counterService, ExtensionGetter extensionGetter,
SystemConfigurableEnvironmentFetcher environmentFetcher) {
SystemConfigurableEnvironmentFetcher environmentFetcher, RedisStreamEventPublisher eventPublisher) {
super(roleService, client, userService, counterService);
this.extensionGetter = extensionGetter;
this.environmentFetcher = environmentFetcher;
this.eventPublisher = eventPublisher;
}

@Override
Expand Down Expand Up @@ -106,7 +110,9 @@ public Mono<Comment> create(Comment comment) {
populateApproveState(populatedComment))
.thenReturn(populatedComment)
)
.flatMap(client::create);
.flatMap(client::create)
.doOnSuccess(c -> eventPublisher.publish(Map.of(
"entity", "comment", "id", c.getMetadata().getName(), "action", "create")));
}

private Mono<Void> populateApproveState(Comment comment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
import run.halo.app.extension.MetadataOperator;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.extension.Ref;
import run.halo.app.infra.messaging.DistributedEvent;
import run.halo.app.infra.messaging.OutboxService;
import run.halo.app.infra.messaging.RedisStreamEventPublisher;
import java.util.Map;
import java.util.HashMap;
import run.halo.app.extension.router.selector.FieldSelector;
import run.halo.app.infra.Condition;
import run.halo.app.infra.ConditionStatus;
Expand All @@ -61,14 +66,19 @@ public class PostServiceImpl extends AbstractContentService implements PostServi
private final CounterService counterService;
private final UserService userService;
private final CategoryService categoryService;
private final RedisStreamEventPublisher eventPublisher;
private final OutboxService outboxService;

public PostServiceImpl(ReactiveExtensionClient client, CounterService counterService,
UserService userService, CategoryService categoryService) {
UserService userService, CategoryService categoryService,
RedisStreamEventPublisher eventPublisher, OutboxService outboxService) {
super(client);
this.client = client;
this.counterService = counterService;
this.userService = userService;
this.categoryService = categoryService;
this.eventPublisher = eventPublisher;
this.outboxService = outboxService;
}

@Override
Expand Down Expand Up @@ -217,7 +227,18 @@ public Mono<Post> draftPost(PostRequest postRequest) {
);
})
.retryWhen(Retry.backoff(5, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
.filter(OptimisticLockingFailureException.class::isInstance))
.flatMap(post -> {
// Create a distributed event and save to outbox
DistributedEvent event = DistributedEvent.builder()
.type(DistributedEvent.EventType.POST_CREATED)
.entityId(post.getMetadata().getName())
.entityType("Post")
.operation("DRAFT")
.build();
return outboxService.saveEvent(event)
.thenReturn(post);
});
}

private Mono<Post> waitForPostToDraftConcludingWork(String postName,
Expand Down Expand Up @@ -311,13 +332,35 @@ public Mono<Post> publish(Post post) {
spec.setHeadSnapshot(spec.getBaseSnapshot());
}
spec.setReleaseSnapshot(spec.getHeadSnapshot());
return client.update(post);
return client.update(post)
.flatMap(updatedPost -> {
// Create a distributed event and save to outbox
DistributedEvent event = DistributedEvent.builder()
.type(DistributedEvent.EventType.POST_UPDATED)
.entityId(updatedPost.getMetadata().getName())
.entityType("Post")
.operation("PUBLISH")
.build();
return outboxService.saveEvent(event)
.thenReturn(updatedPost);
});
}

@Override
public Mono<Post> unpublish(Post post) {
post.getSpec().setPublish(false);
return client.update(post);
return client.update(post)
.flatMap(updatedPost -> {
// Create a distributed event and save to outbox
DistributedEvent event = DistributedEvent.builder()
.type(DistributedEvent.EventType.POST_UPDATED)
.entityId(updatedPost.getMetadata().getName())
.entityType("Post")
.operation("UNPUBLISH")
.build();
return outboxService.saveEvent(event)
.thenReturn(updatedPost);
});
}

@Override
Expand Down Expand Up @@ -391,7 +434,18 @@ public Mono<Post> recycleBy(String postName, String username) {
.flatMap(post -> updatePostWithRetry(post, record -> {
record.getSpec().setDeleted(true);
return record;
}));
}))
.flatMap(deletedPost -> {
// Create a distributed event and save to outbox
DistributedEvent event = DistributedEvent.builder()
.type(DistributedEvent.EventType.POST_DELETED)
.entityId(deletedPost.getMetadata().getName())
.entityType("Post")
.operation("RECYCLE")
.build();
return outboxService.saveEvent(event)
.thenReturn(deletedPost);
});
}

private Mono<Post> updatePostWithRetry(Post post, UnaryOperator<Post> func) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import run.halo.app.extension.Ref;
import run.halo.app.infra.Condition;
import run.halo.app.infra.ConditionStatus;
import run.halo.app.infra.messaging.RedisStreamEventPublisher;

import java.util.Map;

/**
* Single page service implementation.
Expand All @@ -50,13 +53,15 @@ public class SinglePageServiceImpl extends AbstractContentService implements Sin
private final ReactiveExtensionClient client;
private final CounterService counterService;
private final UserService userService;
private final RedisStreamEventPublisher eventPublisher;

public SinglePageServiceImpl(ReactiveExtensionClient client, CounterService counterService,
UserService userService) {
super(client);
UserService userService, RedisStreamEventPublisher eventPublisher) {
super(client, eventPublisher);
this.client = client;
this.counterService = counterService;
this.userService = userService;
this.eventPublisher = eventPublisher;
}

@Override
Expand Down Expand Up @@ -110,6 +115,15 @@ public Mono<SinglePage> draft(SinglePageRequest pageRequest) {
}
)
.flatMap(client::create)
.doOnSuccess(page -> {
if (eventPublisher != null) {
eventPublisher.publish(Map.of(
"entity", "singlepage",
"id", page.getMetadata().getName(),
"action", "create"
));
}
})
.flatMap(page -> {
var contentRequest =
new ContentRequest(Ref.of(page), page.getSpec().getHeadSnapshot(),
Expand Down Expand Up @@ -251,7 +265,16 @@ private Mono<SinglePage> publish(SinglePage singlePage) {
spec.setHeadSnapshot(spec.getBaseSnapshot());
}
spec.setReleaseSnapshot(spec.getHeadSnapshot());
return client.update(singlePage);
return client.update(singlePage)
.doOnSuccess(page -> {
if (eventPublisher != null) {
eventPublisher.publish(Map.of(
"entity", "singlepage",
"id", page.getMetadata().getName(),
"action", "update"
));
}
});
}

Mono<SinglePage> publishPageWithRetry(SinglePage page) {
Expand Down
Loading