Skip to content

Subscribe from the log offset that has been ttl don't throw OffsetOutOfRangeException #521

@luoyuxia

Description

@luoyuxia

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

main (development)

Please describe the bug 🐞

If subscribe from the log offset that has been delete by remote log ttl, I expect it to throw OffsetOutOfRangeException, but it doesn't

Can be reproduce by the following IT:

public class FlussLogITCase {
    @RegisterExtension
    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
            FlussClusterExtension.builder()
                    .setNumOfTabletServers(3)
                    .setClusterConf(initConfig())
                    .build();

    private static Configuration initConfig() {
        Configuration conf = new Configuration();
        conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("1kb"));
        conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ofSeconds(1));
        return conf;
    }

    protected Connection conn;
    protected Admin admin;
    protected Configuration clientConf;

    @BeforeEach
    protected void setup() throws Exception {
        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
        conn = ConnectionFactory.createConnection(clientConf);
        admin = conn.getAdmin();
    }

    @Test
    void testSubscribeTTLLog() throws Exception {
        TablePath tablePath = TablePath.of("test_db", "testSubscribeTTLLog");
        TableDescriptor tableDescriptor =
                TableDescriptor.builder()
                        .schema(
                                Schema.newBuilder()
                                        .column("a", DataTypes.INT())
                                        .column("b", DataTypes.STRING())
                                        .build())
                        // ttl eagerly
                        .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofSeconds(1))
                        .distributedBy(1)
                        .build();
        createTable(tablePath, tableDescriptor, false);
        try (Table table = conn.getTable(tablePath)) {
            AppendWriter appendWriter = table.newAppend().createWriter();
            for (int n = 0; n < 10; n++) {
                for (int i = 0; i < 10; i++) {
                    appendWriter.append(row(1, "a"));
                }
                appendWriter.flush();
            }

            // wait 5s for log is ttl
            Thread.sleep(5_000);

            try (LogScanner logScanner = table.newScan().createLogScanner()) {
                logScanner.subscribe(0, 2);
                while (true) {
                    ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
                    scanRecords.iterator().forEachRemaining(System.out::println);
                }
            }
        }
    }

    protected long createTable(
            TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfExists)
            throws Exception {
        admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, ignoreIfExists)
                .get();
        admin.createTable(tablePath, tableDescriptor, ignoreIfExists).get();
        return admin.getTableInfo(tablePath).get().getTableId();
    }
}

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions