-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathDataStreamChannel.cs
89 lines (77 loc) · 4.03 KB
/
DataStreamChannel.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System.Collections.Generic;
using System.Linq;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
namespace Elastic.Ingest.Elasticsearch.DataStreams;
/// <summary> A channel to push messages to Elasticsearch data streams </summary>
public class DataStreamChannel<TEvent> : ElasticsearchChannelBase<TEvent, DataStreamChannelOptions<TEvent>>
where TEvent : class
{
private readonly string _url;
/// <inheritdoc cref="DataStreamChannel{TEvent}"/>
public DataStreamChannel(DataStreamChannelOptions<TEvent> options) : this(options, null) { }
/// <inheritdoc cref="DataStreamChannel{TEvent}"/>
public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners, string? diagnosticsName = null)
: base(options, callbackListeners, diagnosticsName ?? nameof(DataStreamChannel<TEvent>))
{
var dataStream = Options.DataStream.ToString();
_url = $"{dataStream}/{base.BulkPathAndQuery}";
}
/// <inheritdoc cref="EventIndexStrategy"/>
protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event)
{
var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event);
var templates = Options.DynamicTemplateLookup?.Invoke(@event);
if (templates is null && listExecutedPipelines is null or false)
return (HeaderSerializationStrategy.CreateNoParams, null);
var header = new BulkHeader
{
DynamicTemplates = templates,
ListExecutedPipelines = listExecutedPipelines
};
return (HeaderSerializationStrategy.Create, header);
}
/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.TemplateName"/>
protected override string TemplateName => Options.DataStream.GetTemplateName();
/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.TemplateWildcard"/>
protected override string TemplateWildcard => Options.DataStream.GetNamespaceWildcard();
/// <inheritdoc cref="ElasticsearchChannelBase{TEvent, TChannelOptions}.BulkPathAndQuery"/>
protected override string BulkPathAndQuery => _url;
/// <summary>
/// Gets a default index template for the current <see cref="DataStreamChannel{TEvent}"/>
/// </summary>
/// <returns>A tuple of (name, body) describing the index template</returns>
protected override (string, string) GetDefaultIndexTemplate(string name, string match, string mappingsName, string settingsName)
{
var additionalComponents = GetInferredComponentTemplates();
var additionalComponentsJson = string.Join(", ", additionalComponents.Select(a => $"\"{a}\""));
var indexTemplateBody = @$"{{
""index_patterns"": [""{match}""],
""data_stream"": {{ }},
""composed_of"": [ ""{mappingsName}"", ""{settingsName}"", {additionalComponentsJson} ],
""priority"": 201,
""_meta"": {{
""description"": ""Template installed by .NET ingest libraries (https://github.com/elastic/elastic-ingest-dotnet)"",
""assembly_version"": ""{LibraryVersion.Current}""
}}
}}";
return (name, indexTemplateBody);
}
/// <summary>
/// Yields additional component templates to include in the index template based on the data stream naming scheme
/// </summary>
protected List<string> GetInferredComponentTemplates()
{
var additionalComponents = new List<string> { "data-streams-mappings" };
// if we know the type of data is logs or metrics apply certain defaults that Elasticsearch ships with.
if (Options.DataStream.Type.ToLowerInvariant() == "logs")
additionalComponents.AddRange(new[] { "logs-settings", "logs-mappings" });
else if (Options.DataStream.Type.ToLowerInvariant() == "metrics")
additionalComponents.AddRange(new[] { "metrics-settings", "metrics-mappings" });
return additionalComponents;
}
}