-
Notifications
You must be signed in to change notification settings - Fork 165
/
Copy pathMergeDatedDatums.cpp
104 lines (88 loc) · 3.77 KB
/
MergeDatedDatums.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/************************************************************************
* Copyright(c) 2009, One Unified. All rights reserved. *
* email: [email protected] *
* *
* This file is provided as is WITHOUT ANY WARRANTY *
* without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
* *
* This software may not be used nor distributed without proper license *
* agreement. *
* *
* See the file LICENSE.txt for redistribution information. *
************************************************************************/
#include "MergeDatedDatums.h"
namespace ou { // One Unified
namespace tf { // TradeFrame
//
// MergeDatedDatums
//
MergeDatedDatums::MergeDatedDatums()
: m_state( eInit ), m_request( eUnknown )
{
}
MergeDatedDatums::~MergeDatedDatums() {
while ( !m_mhCarriers.Empty() ) {
MergeCarrierBase *p = m_mhCarriers.RemoveEnd();
delete p;
}
}
void MergeDatedDatums::Add( TimeSeries<Quote>& series, MergeDatedDatums::OnDatumHandler function ) {
m_mhCarriers.Append( new MergeCarrier<Quote>( series, function ) );
}
void MergeDatedDatums::Add( TimeSeries<Trade>& series, MergeDatedDatums::OnDatumHandler function ) {
m_mhCarriers.Append( new MergeCarrier<Trade>( series, function ) );
}
void MergeDatedDatums::Add( TimeSeries<Bar>& series, MergeDatedDatums::OnDatumHandler function ) {
m_mhCarriers.Append( new MergeCarrier<Bar>( series, function ) );
}
void MergeDatedDatums::Add( TimeSeries<Greek>& series, MergeDatedDatums::OnDatumHandler function ) {
m_mhCarriers.Append( new MergeCarrier<Greek>( series, function ) );
}
void MergeDatedDatums::Add( TimeSeries<DepthByMM>& series, MergeDatedDatums::OnDatumHandler function ) {
m_mhCarriers.Append( new MergeCarrier<DepthByMM>( series, function ) );
}
void MergeDatedDatums::Add( TimeSeries<DepthByOrder>& series, MergeDatedDatums::OnDatumHandler function ) {
m_mhCarriers.Append( new MergeCarrier<DepthByOrder>( series, function ) );
}
// http://www.codeguru.com/forum/archive/index.php/t-344661.html
/*
struct SortByMergeCarrier {
public:
SortByMergeCarrier( std::vector<MergeCarrierBase *> *v ): m_v( v ) {};
bool operator() ( size_t lhs, size_t rhs ) { return (*m_v)[lhs]->GetDateTime() < (*m_v)[rhs]->GetDateTime(); };
protected:
std::vector<MergeCarrierBase *> *m_v;
};
*/
// be aware that this maybe running in alternate thread
// the thread is not created in this class
void MergeDatedDatums::Run() {
m_request = eRun;
size_t cntCarriers = m_mhCarriers.Size();
// LOG << "#carriers: " << cntCarriers; // need cross thread writing
MergeCarrierBase* pCarrier = nullptr;
m_cntProcessedDatums = 0;
m_state = eRunning;
while ( ( 0 != cntCarriers ) && ( eRun == m_request ) ) { // once all series have been depleted, end of run
pCarrier = m_mhCarriers.GetRoot();
pCarrier->ProcessDatum(); // automatically loads next datum when done
++m_cntProcessedDatums;
if ( nullptr == pCarrier->GetDatedDatum() ) {
// retire the consumed carrier
m_mhCarriers.ArchiveRoot();
--cntCarriers;
}
else {
// reorder the carriers
m_mhCarriers.SiftDown();
}
}
m_state = eStopped;
// LOG << "Merge stats: " << m_cntProcessedDatums << ", " << m_cntReorders;
}
void MergeDatedDatums::Stop() {
m_request = eStop;
}
} // namespace tf
} // namespace ou