Skip to content

Commit cea80a9

Browse files
committed
streams
1 parent 64e34ec commit cea80a9

File tree

2 files changed

+558
-0
lines changed

2 files changed

+558
-0
lines changed

src/Amortisation4.fs

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
namespace FSharp.Finance.Personal
2+
3+
/// Amortisation calculation module using Functional Reactive Programming with F# Streams
4+
/// This module replicates the functionality of Amortisation.fs using reactive stream processing
5+
module Amortisation4 =
6+
7+
open System
8+
open Calculation
9+
open Currency
10+
open DateDay
11+
open Scheduling
12+
open Util
13+
14+
/// Stream processing utilities for reactive amortisation calculations
15+
module Streams =
16+
17+
/// A simple reactive stream implementation for F#
18+
type Stream<'T> = {
19+
Subscribe: ('T -> unit) -> IDisposable
20+
}
21+
22+
/// Create a stream from a sequence
23+
let fromSeq (seq: 'T seq) : Stream<'T> =
24+
{ Subscribe = fun observer ->
25+
let cancelled = ref false
26+
let disposable = { new IDisposable with member _.Dispose() = cancelled := true }
27+
async {
28+
for item in seq do
29+
if not !cancelled then
30+
observer item
31+
} |> Async.Start
32+
disposable
33+
}
34+
35+
/// Map a function over a stream
36+
let map (f: 'T -> 'U) (stream: Stream<'T>) : Stream<'U> =
37+
{ Subscribe = fun observer ->
38+
stream.Subscribe(f >> observer)
39+
}
40+
41+
/// Filter a stream
42+
let filter (predicate: 'T -> bool) (stream: Stream<'T>) : Stream<'T> =
43+
{ Subscribe = fun observer ->
44+
stream.Subscribe(fun x -> if predicate x then observer x)
45+
}
46+
47+
/// Scan (fold with intermediate results) over a stream
48+
let scan (f: 'State -> 'T -> 'State) (initial: 'State) (stream: Stream<'T>) : Stream<'State> =
49+
{ Subscribe = fun observer ->
50+
let state = ref initial
51+
observer !state // Emit initial state
52+
stream.Subscribe(fun x ->
53+
state := f !state x
54+
observer !state
55+
)
56+
}
57+
58+
/// Combine two streams using a function
59+
let combine (f: 'T -> 'U -> 'V) (stream1: Stream<'T>) (stream2: Stream<'U>) : Stream<'V> =
60+
{ Subscribe = fun observer ->
61+
let latest1 = ref None
62+
let latest2 = ref None
63+
let checkEmit () =
64+
match !latest1, !latest2 with
65+
| Some v1, Some v2 -> observer (f v1 v2)
66+
| _ -> ()
67+
68+
let disp1 = stream1.Subscribe(fun x ->
69+
latest1 := Some x
70+
checkEmit()
71+
)
72+
let disp2 = stream2.Subscribe(fun x ->
73+
latest2 := Some x
74+
checkEmit()
75+
)
76+
77+
{ new IDisposable with
78+
member _.Dispose() =
79+
disp1.Dispose()
80+
disp2.Dispose()
81+
}
82+
}
83+
84+
/// Collect stream values into a list
85+
let toList (stream: Stream<'T>) : 'T list =
86+
let results = ResizeArray<'T>()
87+
let mutable completed = false
88+
use subscription = stream.Subscribe(fun x ->
89+
results.Add(x)
90+
)
91+
// For synchronous streams, we assume immediate completion
92+
results |> List.ofSeq
93+
94+
/// Reactive types for amortisation calculations
95+
type AmortisationState = {
96+
OffsetDay: int<OffsetDay>
97+
Principal: decimal<Currency>
98+
Interest: decimal<Currency>
99+
Fees: decimal<Currency>
100+
TotalBalance: decimal<Currency>
101+
ScheduleItems: Map<int<OffsetDay>, ScheduleItem>
102+
}
103+
104+
/// Events that can occur during amortisation
105+
type AmortisationEvent =
106+
| PaymentDue of ScheduledPayment
107+
| InterestAccrued of decimal<Currency>
108+
| FeeApplied of Fee
109+
| BalanceUpdated of decimal<Currency>
110+
111+
/// Stream-based payment processor
112+
let processPaymentStream (parameters: Parameters) (initialState: AmortisationState) : Streams.Stream<AmortisationEvent> -> Streams.Stream<AmortisationState> =
113+
fun eventStream ->
114+
eventStream
115+
|> Streams.scan (fun state event ->
116+
match event with
117+
| PaymentDue payment ->
118+
// Process scheduled payment
119+
let appliedPayment = AppliedPayment.generate parameters state.OffsetDay payment
120+
let newPrincipal = state.Principal - appliedPayment.PrincipalPortion
121+
let newInterest = state.Interest - appliedPayment.InterestPortion
122+
let newFees = state.Fees - appliedPayment.FeesPortion
123+
124+
{ state with
125+
Principal = newPrincipal
126+
Interest = newInterest
127+
Fees = newFees
128+
TotalBalance = newPrincipal + newInterest + newFees
129+
}
130+
131+
| InterestAccrued amount ->
132+
{ state with
133+
Interest = state.Interest + amount
134+
TotalBalance = state.TotalBalance + amount
135+
}
136+
137+
| FeeApplied fee ->
138+
let feeAmount = Fee.calculate parameters.FeeTypes fee
139+
{ state with
140+
Fees = state.Fees + feeAmount
141+
TotalBalance = state.TotalBalance + feeAmount
142+
}
143+
144+
| BalanceUpdated newBalance ->
145+
{ state with TotalBalance = newBalance }
146+
) initialState
147+
148+
/// Generate interest accrual events stream
149+
let generateInterestStream (parameters: Parameters) (balanceStream: Streams.Stream<AmortisationState>) : Streams.Stream<AmortisationEvent> =
150+
balanceStream
151+
|> Streams.map (fun state ->
152+
let dailyRate = Interest.Daily.calculate parameters.Interest state.Principal
153+
InterestAccrued dailyRate
154+
)
155+
156+
/// Generate fee events stream
157+
let generateFeeStream (parameters: Parameters) (dayStream: Streams.Stream<int<OffsetDay>>) : Streams.Stream<AmortisationEvent> =
158+
dayStream
159+
|> Streams.filter (fun day ->
160+
parameters.FeeTypes |> List.exists (fun feeType ->
161+
Fee.isApplicableOnDay feeType day
162+
))
163+
|> Streams.map (fun day ->
164+
// Find applicable fee for this day
165+
parameters.FeeTypes
166+
|> List.tryFind (fun feeType -> Fee.isApplicableOnDay feeType day)
167+
|> Option.map FeeApplied
168+
|> Option.defaultValue (BalanceUpdated 0m<Currency>)
169+
)
170+
171+
/// Generate payment due events stream
172+
let generatePaymentStream (scheduledPayments: ScheduledPayment array) : Streams.Stream<AmortisationEvent> =
173+
scheduledPayments
174+
|> Streams.fromSeq
175+
|> Streams.map PaymentDue
176+
177+
/// Calculate basic schedule using streams
178+
let calculateBasicScheduleStream (parameters: Parameters) : Streams.Stream<int<OffsetDay>> =
179+
let sp = Scheduling.generate parameters
180+
[1<OffsetDay> .. sp.FinalPaymentDay]
181+
|> Streams.fromSeq
182+
183+
/// Main reactive amortisation calculation
184+
let amortiseReactive (parameters: Parameters) (scheduledPayments: ScheduledPayment array) : ScheduleItem array =
185+
186+
// Initialize state
187+
let initialState = {
188+
OffsetDay = 0<OffsetDay>
189+
Principal = parameters.Principal
190+
Interest = 0m<Currency>
191+
Fees = 0m<Currency>
192+
TotalBalance = parameters.Principal
193+
ScheduleItems = Map.empty
194+
}
195+
196+
// Generate event streams
197+
let dayStream = calculateBasicScheduleStream parameters
198+
let paymentEventStream = generatePaymentStream scheduledPayments
199+
let interestEventStream = generateInterestStream parameters (Streams.fromSeq [initialState])
200+
let feeEventStream = generateFeeStream parameters dayStream
201+
202+
// Combine all event streams
203+
let combinedEventStream =
204+
[paymentEventStream; interestEventStream; feeEventStream]
205+
|> List.fold (fun acc stream ->
206+
Streams.combine (fun _ event -> event) acc stream
207+
) paymentEventStream
208+
209+
// Process events through the payment processor
210+
let stateStream = processPaymentStream parameters initialState combinedEventStream
211+
212+
// Convert final states to schedule items
213+
let finalStates = Streams.toList stateStream
214+
215+
finalStates
216+
|> List.mapi (fun i state ->
217+
let offsetDay = (i + 1) * 1<OffsetDay>
218+
ScheduleItem.create offsetDay state.Principal state.Interest state.Fees
219+
)
220+
|> Array.ofList
221+
222+
/// Stream-based amortisation with enhanced reactive patterns
223+
let amortiseWithStreams (parameters: Parameters) (scheduledPayments: ScheduledPayment array) : ScheduleItem array =
224+
225+
// Create observable sequences for different aspects of the calculation
226+
let daySequence =
227+
let sp = Scheduling.generate parameters
228+
[1<OffsetDay> .. sp.FinalPaymentDay]
229+
230+
// Convert to reactive streams and process
231+
let dayStream = Streams.fromSeq daySequence
232+
233+
// Calculate running balances using scan
234+
let balanceStream =
235+
dayStream
236+
|> Streams.scan (fun (prevBalance, prevInterest, prevFees) currentDay ->
237+
// Find any scheduled payment for this day
238+
let paymentForDay =
239+
scheduledPayments
240+
|> Array.tryFind (fun sp -> sp.OffsetDay = currentDay)
241+
242+
// Calculate daily interest on remaining principal
243+
let dailyInterest = Interest.Daily.calculate parameters.Interest prevBalance
244+
let newInterest = prevInterest + dailyInterest
245+
246+
// Apply any fees for this day
247+
let applicableFees =
248+
parameters.FeeTypes
249+
|> List.filter (fun feeType -> Fee.isApplicableOnDay feeType currentDay)
250+
|> List.sumBy (Fee.calculate parameters.FeeTypes)
251+
let newFees = prevFees + applicableFees
252+
253+
// Apply payment if present
254+
match paymentForDay with
255+
| Some payment ->
256+
let appliedPayment = AppliedPayment.generate parameters currentDay payment
257+
let newBalance = prevBalance - appliedPayment.PrincipalPortion
258+
let finalInterest = newInterest - appliedPayment.InterestPortion
259+
let finalFees = newFees - appliedPayment.FeesPortion
260+
(newBalance, finalInterest, finalFees)
261+
| None ->
262+
(prevBalance, newInterest, newFees)
263+
) (parameters.Principal, 0m<Currency>, 0m<Currency>)
264+
265+
// Convert balance stream to schedule items
266+
let scheduleItemStream =
267+
Streams.combine (fun day (principal, interest, fees) ->
268+
ScheduleItem.create day principal interest fees
269+
) dayStream balanceStream
270+
271+
// Collect results
272+
Streams.toList scheduleItemStream
273+
|> Array.ofList
274+
275+
/// Simplified stream-based amortisation that mirrors the original implementation structure
276+
let amortise (parameters: Parameters) (scheduledPayments: ScheduledPayment array) : ScheduleItem array =
277+
278+
// Generate basic schedule (equivalent to calculateBasicSchedule)
279+
let basicSchedule =
280+
let sp = Scheduling.generate parameters
281+
[|1<OffsetDay> .. sp.FinalPaymentDay|]
282+
283+
// Create payment map for efficient lookup
284+
let paymentMap =
285+
scheduledPayments
286+
|> Array.map (fun sp -> sp.OffsetDay, sp)
287+
|> Map.ofArray
288+
289+
// Process schedule using streams with state accumulation
290+
let scheduleStream = Streams.fromSeq basicSchedule
291+
292+
let finalScheduleItems =
293+
scheduleStream
294+
|> Streams.scan (fun (scheduleMap, currentPrincipal, currentInterest, currentFees) offsetDay ->
295+
296+
// Calculate daily interest
297+
let dailyInterest = Interest.Daily.calculate parameters.Interest currentPrincipal
298+
let newInterest = currentInterest + dailyInterest
299+
300+
// Apply any fees for this day
301+
let dailyFees =
302+
parameters.FeeTypes
303+
|> List.filter (fun feeType -> Fee.isApplicableOnDay feeType offsetDay)
304+
|> List.sumBy (Fee.calculate parameters.FeeTypes)
305+
let newFees = currentFees + dailyFees
306+
307+
// Check for scheduled payment
308+
let (finalPrincipal, finalInterest, finalFees) =
309+
match Map.tryFind offsetDay paymentMap with
310+
| Some payment ->
311+
let appliedPayment = AppliedPayment.generate parameters offsetDay payment
312+
(currentPrincipal - appliedPayment.PrincipalPortion,
313+
newInterest - appliedPayment.InterestPortion,
314+
newFees - appliedPayment.FeesPortion)
315+
| None ->
316+
(currentPrincipal, newInterest, newFees)
317+
318+
// Create schedule item
319+
let scheduleItem = ScheduleItem.create offsetDay finalPrincipal finalInterest finalFees
320+
let updatedMap = Map.add offsetDay scheduleItem scheduleMap
321+
322+
(updatedMap, finalPrincipal, finalInterest, finalFees)
323+
324+
) (Map.empty, parameters.Principal, 0m<Currency>, 0m<Currency>)
325+
|> Streams.toList
326+
|> List.map (fun (scheduleMap, _, _, _) -> scheduleMap)
327+
|> List.last
328+
329+
// Convert map to array, sorted by offset day
330+
finalScheduleItems
331+
|> Map.toArray
332+
|> Array.map snd
333+
|> Array.sortBy (fun si -> si.OffsetDay)

0 commit comments

Comments
 (0)