source: trunk/src/corelib/concurrent/qtconcurrentreducekernel.h@ 497

Last change on this file since 497 was 2, checked in by Dmitry A. Kuminov, 16 years ago

Initially imported qt-all-opensource-src-4.5.1 from Trolltech.

File size: 7.4 KB
Line 
1/****************************************************************************
2**
3** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies).
4** Contact: Qt Software Information ([email protected])
5**
6** This file is part of the QtCore module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial Usage
10** Licensees holding valid Qt Commercial licenses may use this file in
11** accordance with the Qt Commercial License Agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and Nokia.
14**
15** GNU Lesser General Public License Usage
16** Alternatively, this file may be used under the terms of the GNU Lesser
17** General Public License version 2.1 as published by the Free Software
18** Foundation and appearing in the file LICENSE.LGPL included in the
19** packaging of this file. Please review the following information to
20** ensure the GNU Lesser General Public License version 2.1 requirements
21** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
22**
23** In addition, as a special exception, Nokia gives you certain
24** additional rights. These rights are described in the Nokia Qt LGPL
25** Exception version 1.0, included in the file LGPL_EXCEPTION.txt in this
26** package.
27**
28** GNU General Public License Usage
29** Alternatively, this file may be used under the terms of the GNU
30** General Public License version 3.0 as published by the Free Software
31** Foundation and appearing in the file LICENSE.GPL included in the
32** packaging of this file. Please review the following information to
33** ensure the GNU General Public License version 3.0 requirements will be
34** met: http://www.gnu.org/copyleft/gpl.html.
35**
36** If you are unsure which license is appropriate for your use, please
37** contact the sales department at [email protected].
38** $QT_END_LICENSE$
39**
40****************************************************************************/
41
42#ifndef QTCONCURRENT_REDUCEKERNEL_H
43#define QTCONCURRENT_REDUCEKERNEL_H
44
45#include <QtCore/qglobal.h>
46
47#ifndef QT_NO_CONCURRENT
48
49#include <QtCore/qatomic.h>
50#include <QtCore/qlist.h>
51#include <QtCore/qmap.h>
52#include <QtCore/qmutex.h>
53#include <QtCore/qthread.h>
54#include <QtCore/qthreadpool.h>
55#include <QtCore/qvector.h>
56
57QT_BEGIN_HEADER
58QT_BEGIN_NAMESPACE
59
60QT_MODULE(Core)
61
62namespace QtConcurrent {
63
64#ifndef qdoc
65
66/*
67 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
68 limit the reduce queue size for MapReduce. When the number of
69 reduce blocks in the queue exceeds ReduceQueueStartLimit,
70 MapReduce won't start any new threads, and when it exceeds
71 ReduceQueueThrottleLimit running threads will be stopped.
72*/
73enum {
74 ReduceQueueStartLimit = 20,
75 ReduceQueueThrottleLimit = 30
76};
77
78// IntermediateResults holds a block of intermediate results from a
79// map or filter functor. The begin/end offsets indicates the origin
80// and range of the block.
81template <typename T>
82class IntermediateResults
83{
84public:
85 int begin, end;
86 QVector<T> vector;
87};
88
89#endif // qdoc
90
91enum ReduceOption {
92 UnorderedReduce = 0x1,
93 OrderedReduce = 0x2,
94 SequentialReduce = 0x4
95 // ParallelReduce = 0x8
96};
97Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
98Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
99
100#ifndef qdoc
101
102// supports both ordered and out-of-order reduction
103template <typename ReduceFunctor, typename ReduceResultType, typename T>
104class ReduceKernel
105{
106 typedef QMap<int, IntermediateResults<T> > ResultsMap;
107
108 const ReduceOptions reduceOptions;
109
110 QMutex mutex;
111 int progress, resultsMapSize, threadCount;
112 ResultsMap resultsMap;
113
114 bool canReduce(int begin) const
115 {
116 return (((reduceOptions & UnorderedReduce)
117 && progress == 0)
118 || ((reduceOptions & OrderedReduce)
119 && progress == begin));
120 }
121
122 void reduceResult(ReduceFunctor &reduce,
123 ReduceResultType &r,
124 const IntermediateResults<T> &result)
125 {
126 for (int i = 0; i < result.vector.size(); ++i) {
127 reduce(r, result.vector.at(i));
128 }
129 }
130
131 void reduceResults(ReduceFunctor &reduce,
132 ReduceResultType &r,
133 ResultsMap &map)
134 {
135 typename ResultsMap::iterator it = map.begin();
136 while (it != map.end()) {
137 reduceResult(reduce, r, it.value());
138 ++it;
139 }
140 }
141
142public:
143 ReduceKernel(ReduceOptions _reduceOptions)
144 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
145 threadCount(QThreadPool::globalInstance()->maxThreadCount())
146 { }
147
148 void runReduce(ReduceFunctor &reduce,
149 ReduceResultType &r,
150 const IntermediateResults<T> &result)
151 {
152 QMutexLocker locker(&mutex);
153 if (!canReduce(result.begin)) {
154 ++resultsMapSize;
155 resultsMap.insert(result.begin, result);
156 return;
157 }
158
159 if (reduceOptions & UnorderedReduce) {
160 // UnorderedReduce
161 progress = -1;
162
163 // reduce this result
164 locker.unlock();
165 reduceResult(reduce, r, result);
166 locker.relock();
167
168 // reduce all stored results as well
169 while (!resultsMap.isEmpty()) {
170 ResultsMap resultsMapCopy = resultsMap;
171 resultsMap.clear();
172
173 locker.unlock();
174 reduceResults(reduce, r, resultsMapCopy);
175 locker.relock();
176
177 resultsMapSize -= resultsMapCopy.size();
178 }
179
180 progress = 0;
181 } else {
182 // reduce this result
183 locker.unlock();
184 reduceResult(reduce, r, result);
185 locker.relock();
186
187 // OrderedReduce
188 progress += result.end - result.begin;
189
190 // reduce as many other results as possible
191 typename ResultsMap::iterator it = resultsMap.begin();
192 while (it != resultsMap.end()) {
193 if (it.value().begin != progress)
194 break;
195
196 locker.unlock();
197 reduceResult(reduce, r, it.value());
198 locker.relock();
199
200 --resultsMapSize;
201 progress += it.value().end - it.value().begin;
202 it = resultsMap.erase(it);
203 }
204 }
205 }
206
207 // final reduction
208 void finish(ReduceFunctor &reduce, ReduceResultType &r)
209 {
210 reduceResults(reduce, r, resultsMap);
211 }
212
213 inline bool shouldThrottle()
214 {
215 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
216 }
217
218 inline bool shouldStartThread()
219 {
220 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
221 }
222};
223
224template <typename Sequence, typename Base, typename Functor1, typename Functor2>
225struct SequenceHolder2 : public Base
226{
227 SequenceHolder2(const Sequence &_sequence,
228 Functor1 functor1,
229 Functor2 functor2,
230 ReduceOptions reduceOptions)
231 : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
232 sequence(_sequence)
233 { }
234
235 Sequence sequence;
236
237 void finish()
238 {
239 Base::finish();
240 // Clear the sequence to make sure all temporaries are destroyed
241 // before finished is signaled.
242 sequence = Sequence();
243 }
244};
245
246#endif //qdoc
247
248} // namespace QtConcurrent
249
250QT_END_NAMESPACE
251QT_END_HEADER
252
253#endif // QT_NO_CONCURRENT
254
255#endif
Note: See TracBrowser for help on using the repository browser.