Skip to content

Commit 3064fd5

Browse files
committed
fix
1 parent 2d963f3 commit 3064fd5

File tree

3 files changed

+86
-163
lines changed

3 files changed

+86
-163
lines changed

test/versioned/langchain-aws/runnables-streaming.test.js

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@ const { match } = require('../../lib/custom-assertions')
1414
const {
1515
runStreamingEnabledTests,
1616
runStreamingDisabledTest,
17-
runAiMonitoringDisabledTests,
18-
runBothDisabledTest,
19-
runStreamingEdgeCasesTests
17+
runAiMonitoringDisabledTests
2018
} = require('../langchain/runnables-streaming')
2119
const { FAKE_CREDENTIALS, getAiResponseServer } = require('../../lib/aws-server-stubs')
2220
const helper = require('../../lib/agent_helper')
@@ -112,22 +110,3 @@ test('ai_monitoring disabled', async (t) => {
112110
expectedContent: () => 'This is a test.'
113111
})(t)
114112
})
115-
116-
test('both ai_monitoring and streaming disabled', async (t) => {
117-
t.beforeEach((ctx) => beforeEach({ enabled: false, ctx }))
118-
t.afterEach((ctx) => afterEach(ctx))
119-
120-
await runBothDisabledTest({
121-
inputData: { topic: 'streamed' },
122-
expectedContent: () => 'This is a test.'
123-
})(t)
124-
})
125-
126-
test('streaming enabled - edge cases', async (t) => {
127-
t.beforeEach((ctx) => beforeEach({ enabled: true, ctx }))
128-
t.afterEach((ctx) => afterEach(ctx))
129-
130-
await runStreamingEdgeCasesTests({
131-
inputData: { topic: 'streamed' }
132-
})(t)
133-
})

test/versioned/langchain-openai/runnables-streaming.test.js

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ const { match } = require('../../lib/custom-assertions')
1313
const {
1414
runStreamingEnabledTests,
1515
runStreamingDisabledTest,
16-
runAiMonitoringDisabledTests,
17-
runBothDisabledTest,
18-
runStreamingEdgeCasesTests
16+
runAiMonitoringDisabledTests
1917
} = require('../langchain/runnables-streaming')
2018
const createOpenAIMockServer = require('../openai/mock-server')
2119
const mockResponses = require('../openai/mock-chat-api-responses')
@@ -114,22 +112,3 @@ test('ai_monitoring disabled', async (t) => {
114112
expectedContent: () => mockResponses.get('Streamed response').streamData
115113
})(t)
116114
})
117-
118-
test('both ai_monitoring and streaming disabled', async (t) => {
119-
t.beforeEach((ctx) => beforeEach({ enabled: false, ctx }))
120-
t.afterEach((ctx) => afterEach(ctx))
121-
122-
await runBothDisabledTest({
123-
inputData: { topic: 'Streamed' },
124-
expectedContent: () => mockResponses.get('Streamed response').streamData
125-
})(t)
126-
})
127-
128-
test('streaming enabled - edge cases', async (t) => {
129-
t.beforeEach((ctx) => beforeEach({ enabled: true, ctx }))
130-
t.afterEach((ctx) => afterEach(ctx))
131-
132-
await runStreamingEdgeCasesTests({
133-
inputData: { topic: 'Streamed' }
134-
})(t)
135-
})

test/versioned/langchain/runnables-streaming.js

Lines changed: 84 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,75 @@ function runStreamingEnabledTests(config) {
541541
end()
542542
})
543543
})
544+
545+
await t.test(
546+
'should handle metadata properly during stream processing',
547+
(t, end) => {
548+
const { agent, prompt, model, outputParser } = t.nr
549+
550+
helper.runInTransaction(agent, async (tx) => {
551+
const input = inputData
552+
const options = {
553+
metadata: { streamKey: 'streamValue', anotherKey: 'anotherValue' },
554+
tags: ['stream-tag1', 'stream-tag2']
555+
}
556+
557+
const chain = prompt.pipe(model).pipe(outputParser)
558+
const stream = await chain.stream(input, options)
559+
for await (const chunk of stream) {
560+
consumeStreamChunk(chunk)
561+
}
562+
563+
const events = agent.customEventAggregator.events.toArray()
564+
const langchainEvents = filterLangchainEvents(events)
565+
const langChainSummaryEvents = filterLangchainEventsByType(
566+
langchainEvents,
567+
'LlmChatCompletionSummary'
568+
)
569+
570+
const [[, summary]] = langChainSummaryEvents
571+
assert.equal(summary['metadata.streamKey'], 'streamValue')
572+
assert.equal(summary['metadata.anotherKey'], 'anotherValue')
573+
574+
const tags = summary.tags.split(',')
575+
assert.ok(tags.includes('stream-tag1'))
576+
assert.ok(tags.includes('stream-tag2'))
577+
578+
tx.end()
579+
end()
580+
})
581+
}
582+
)
583+
584+
await t.test(
585+
'should properly extend segment duration on each stream iteration',
586+
(t, end) => {
587+
const { agent, prompt, model, outputParser } = t.nr
588+
589+
helper.runInTransaction(agent, async (tx) => {
590+
const input = inputData
591+
592+
const chain = prompt.pipe(model).pipe(outputParser)
593+
const stream = await chain.stream(input)
594+
595+
const [segment] = tx.trace.getChildren(tx.trace.root.id)
596+
assert.equal(segment.name, 'Llm/chain/Langchain/stream', 'should find the Langchain stream segment')
597+
598+
let chunkCount = 0
599+
for await (const chunk of stream) {
600+
consumeStreamChunk(chunk)
601+
chunkCount++
602+
}
603+
604+
// Segment should have been touched multiple times during streaming
605+
assert.ok(chunkCount > 1, 'should have received multiple chunks')
606+
assert.ok(segment.timer.hrDuration)
607+
608+
tx.end()
609+
end()
610+
})
611+
}
612+
)
544613
}
545614
}
546615

@@ -601,55 +670,9 @@ function runStreamingDisabledTest(config) {
601670
})
602671
}
603672
)
604-
}
605-
}
606-
607-
/**
608-
* Runs the ai_monitoring disabled tests
609-
* @param {object} config Configuration for the test suite
610-
* @param {object} config.inputData The input data to pass to stream calls
611-
* @param {Function} [config.expectedContent] Function to get expected content
612-
*/
613-
function runAiMonitoringDisabledTests(config) {
614-
const { inputData, expectedContent } = config
615-
616-
function consumeStreamChunk() {
617-
// A no-op function used to consume chunks of a stream.
618-
}
619673

620-
return async (t) => {
621674
await t.test(
622-
'should not create llm events when `ai_monitoring.enabled` is false',
623-
(t, end) => {
624-
const { agent, prompt, outputParser, model } = t.nr
625-
agent.config.ai_monitoring.enabled = false
626-
627-
helper.runInTransaction(agent, async (tx) => {
628-
const input = inputData
629-
630-
const chain = prompt.pipe(model).pipe(outputParser)
631-
const stream = await chain.stream(input)
632-
let content = ''
633-
for await (const chunk of stream) {
634-
content += chunk
635-
}
636-
637-
if (expectedContent) {
638-
const expected = expectedContent()
639-
assert.equal(content, expected)
640-
}
641-
642-
const events = agent.customEventAggregator.events.toArray()
643-
assert.equal(events.length, 0, 'should not create llm events when ai_monitoring is disabled')
644-
645-
tx.end()
646-
end()
647-
})
648-
}
649-
)
650-
651-
await t.test(
652-
'should not create segment when `ai_monitoring.enabled` is false',
675+
'should not create segment when `ai_monitoring.streaming.enabled` is false',
653676
(t, end) => {
654677
const { agent, prompt, outputParser, model } = t.nr
655678
agent.config.ai_monitoring.enabled = false
@@ -675,17 +698,21 @@ function runAiMonitoringDisabledTests(config) {
675698
}
676699

677700
/**
678-
* Runs the test for both ai_monitoring and streaming disabled
701+
* Runs the ai_monitoring disabled tests
679702
* @param {object} config Configuration for the test suite
680703
* @param {object} config.inputData The input data to pass to stream calls
681704
* @param {Function} [config.expectedContent] Function to get expected content
682705
*/
683-
function runBothDisabledTest(config) {
706+
function runAiMonitoringDisabledTests(config) {
684707
const { inputData, expectedContent } = config
685708

709+
function consumeStreamChunk() {
710+
// A no-op function used to consume chunks of a stream.
711+
}
712+
686713
return async (t) => {
687714
await t.test(
688-
'should not create llm events when both `ai_monitoring.enabled` and `ai_monitoring.streaming.enabled` are false',
715+
'should not create llm events when `ai_monitoring.enabled` is false',
689716
(t, end) => {
690717
const { agent, prompt, outputParser, model } = t.nr
691718
agent.config.ai_monitoring.enabled = false
@@ -706,61 +733,7 @@ function runBothDisabledTest(config) {
706733
}
707734

708735
const events = agent.customEventAggregator.events.toArray()
709-
assert.equal(events.length, 0, 'should not create llm events when both configs are disabled')
710-
711-
tx.end()
712-
end()
713-
})
714-
}
715-
)
716-
}
717-
}
718-
719-
/**
720-
* Runs the streaming enabled edge cases tests
721-
* @param {object} config Configuration for the test suite
722-
* @param {object} config.inputData The input data to pass to stream calls
723-
*/
724-
function runStreamingEdgeCasesTests(config) {
725-
const { inputData } = config
726-
727-
function consumeStreamChunk() {
728-
// A no-op function used to consume chunks of a stream.
729-
}
730-
731-
return async (t) => {
732-
await t.test(
733-
'should handle metadata properly during stream processing',
734-
(t, end) => {
735-
const { agent, prompt, model, outputParser } = t.nr
736-
737-
helper.runInTransaction(agent, async (tx) => {
738-
const input = inputData
739-
const options = {
740-
metadata: { streamKey: 'streamValue', anotherKey: 'anotherValue' },
741-
tags: ['stream-tag1', 'stream-tag2']
742-
}
743-
744-
const chain = prompt.pipe(model).pipe(outputParser)
745-
const stream = await chain.stream(input, options)
746-
for await (const chunk of stream) {
747-
consumeStreamChunk(chunk)
748-
}
749-
750-
const events = agent.customEventAggregator.events.toArray()
751-
const langchainEvents = filterLangchainEvents(events)
752-
const langChainSummaryEvents = filterLangchainEventsByType(
753-
langchainEvents,
754-
'LlmChatCompletionSummary'
755-
)
756-
757-
const [[, summary]] = langChainSummaryEvents
758-
assert.equal(summary['metadata.streamKey'], 'streamValue')
759-
assert.equal(summary['metadata.anotherKey'], 'anotherValue')
760-
761-
const tags = summary.tags.split(',')
762-
assert.ok(tags.includes('stream-tag1'))
763-
assert.ok(tags.includes('stream-tag2'))
736+
assert.equal(events.length, 0, 'should not create llm events when ai_monitoring is disabled')
764737

765738
tx.end()
766739
end()
@@ -769,28 +742,22 @@ function runStreamingEdgeCasesTests(config) {
769742
)
770743

771744
await t.test(
772-
'should properly extend segment duration on each stream iteration',
745+
'should not create segment when `ai_monitoring.enabled` is false',
773746
(t, end) => {
774-
const { agent, prompt, model, outputParser } = t.nr
747+
const { agent, prompt, outputParser, model } = t.nr
748+
agent.config.ai_monitoring.enabled = false
775749

776750
helper.runInTransaction(agent, async (tx) => {
777751
const input = inputData
778752

779753
const chain = prompt.pipe(model).pipe(outputParser)
780754
const stream = await chain.stream(input)
781-
782-
const [segment] = tx.trace.getChildren(tx.trace.root.id)
783-
assert.equal(segment.name, 'Llm/chain/Langchain/stream', 'should find the Langchain stream segment')
784-
785-
let chunkCount = 0
786755
for await (const chunk of stream) {
787756
consumeStreamChunk(chunk)
788-
chunkCount++
789757
}
790758

791-
// Segment should have been touched multiple times during streaming
792-
assert.ok(chunkCount > 1, 'should have received multiple chunks')
793-
assert.ok(segment.timer.hrDuration)
759+
const segment = findSegment(tx.trace, tx.trace.root, 'Llm/chain/Langchain/stream')
760+
assert.equal(segment, undefined, 'should not create Llm/chain/Langchain/stream segment when ai_monitoring is disabled')
794761

795762
tx.end()
796763
end()
@@ -803,7 +770,5 @@ function runStreamingEdgeCasesTests(config) {
803770
module.exports = {
804771
runStreamingEnabledTests,
805772
runStreamingDisabledTest,
806-
runAiMonitoringDisabledTests,
807-
runBothDisabledTest,
808-
runStreamingEdgeCasesTests
773+
runAiMonitoringDisabledTests
809774
}

0 commit comments

Comments
 (0)