forked from Azure/azure-sdk-for-java
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGetEventHubMetadata.java
61 lines (53 loc) · 2.94 KB
/
GetEventHubMetadata.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import java.util.concurrent.Semaphore;
/**
* Demonstrates how to fetch metadata from an Event Hub's partitions.
*/
public class GetEventHubMetadata {
/**
* Demonstrates how to get metadata from an Event Hub's partitions.
*
* @param args Unused arguments to the sample.
* @throws InterruptedException if the semaphore could not be acquired.
*/
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);
// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";
// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncClient();
// Acquiring the semaphore so that this sample does not end before all the partition properties are fetched.
semaphore.acquire();
// Querying the partition identifiers for the Event Hub. Then calling client.getPartitionProperties with the
// identifier to get information about each partition.
client.getPartitionIds().flatMap(partitionId -> client.getPartitionProperties(partitionId))
.subscribe(properties -> {
System.out.println("The Event Hub has the following properties:");
System.out.printf(
"Event Hub Name: %s; Partition Id: %s; Is partition empty? %s; First Sequence Number: %s; "
+ "Last Enqueued Time: %s; Last Enqueued Sequence Number: %s; Last Enqueued Offset: %s",
properties.eventHubPath(), properties.id(), properties.isEmpty(),
properties.beginningSequenceNumber(),
properties.lastEnqueuedTime(),
properties.lastEnqueuedSequenceNumber(),
properties.lastEnqueuedOffset());
}, error -> {
System.err.println("Error occurred while fetching partition properties: " + error.toString());
}, () -> {
// Releasing the semaphore now that we've finished querying for partition properties.
semaphore.release();
});
System.out.println("Waiting for partition properties to complete...");
semaphore.acquire();
System.out.println("Finished.");
}
}