source: trunk/src/corelib/concurrent/qtconcurrentiteratekernel.h@ 467

Last change on this file since 467 was 2, checked in by Dmitry A. Kuminov, 16 years ago

Initially imported qt-all-opensource-src-4.5.1 from Trolltech.

File size: 9.7 KB
Line 
1/****************************************************************************
2**
3** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies).
4** Contact: Qt Software Information ([email protected])
5**
6** This file is part of the QtCore module of the Qt Toolkit.
7**
8** $QT_BEGIN_LICENSE:LGPL$
9** Commercial Usage
10** Licensees holding valid Qt Commercial licenses may use this file in
11** accordance with the Qt Commercial License Agreement provided with the
12** Software or, alternatively, in accordance with the terms contained in
13** a written agreement between you and Nokia.
14**
15** GNU Lesser General Public License Usage
16** Alternatively, this file may be used under the terms of the GNU Lesser
17** General Public License version 2.1 as published by the Free Software
18** Foundation and appearing in the file LICENSE.LGPL included in the
19** packaging of this file. Please review the following information to
20** ensure the GNU Lesser General Public License version 2.1 requirements
21** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
22**
23** In addition, as a special exception, Nokia gives you certain
24** additional rights. These rights are described in the Nokia Qt LGPL
25** Exception version 1.0, included in the file LGPL_EXCEPTION.txt in this
26** package.
27**
28** GNU General Public License Usage
29** Alternatively, this file may be used under the terms of the GNU
30** General Public License version 3.0 as published by the Free Software
31** Foundation and appearing in the file LICENSE.GPL included in the
32** packaging of this file. Please review the following information to
33** ensure the GNU General Public License version 3.0 requirements will be
34** met: http://www.gnu.org/copyleft/gpl.html.
35**
36** If you are unsure which license is appropriate for your use, please
37** contact the sales department at [email protected].
38** $QT_END_LICENSE$
39**
40****************************************************************************/
41
42#ifndef QTCONCURRENT_ITERATEKERNEL_H
43#define QTCONCURRENT_ITERATEKERNEL_H
44
45#include <QtCore/qglobal.h>
46
47#ifndef QT_NO_CONCURRENT
48
49#include <QtCore/qatomic.h>
50#include <QtCore/qtconcurrentmedian.h>
51#include <QtCore/qtconcurrentthreadengine.h>
52#include <iterator>
53
54
55QT_BEGIN_HEADER
56QT_BEGIN_NAMESPACE
57
58QT_MODULE(Core)
59
60#ifndef qdoc
61
62namespace QtConcurrent {
63
64#ifndef QT_NO_STL
65 using std::advance;
66#else
67 template <typename It, typename T>
68 void advance(It &it, T value)
69 {
70 it+=value;
71 }
72#endif
73
74/*
75 The BlockSizeManager class manages how many iterations a thread should
76 reserve and process at a time. This is done by measuring the time spent
77 in the user code versus the control part code, and then increasing
78 the block size if the ratio between them is to small. The block size
79 management is done on the basis of the median of several timing measuremens,
80 and it is done induvidualy for each thread.
81*/
82class Q_CORE_EXPORT BlockSizeManager
83{
84public:
85 BlockSizeManager(int iterationCount);
86 void timeBeforeUser();
87 void timeAfterUser();
88 int blockSize();
89private:
90 inline bool blockSizeMaxed()
91 {
92 return (m_blockSize >= maxBlockSize);
93 }
94
95 const int maxBlockSize;
96 qint64 beforeUser;
97 qint64 afterUser;
98 Median<double> controlPartElapsed;
99 Median<double> userPartElapsed;
100 int m_blockSize;
101};
102
103template <typename T>
104class ResultReporter
105{
106public:
107 ResultReporter(ThreadEngine<T> *_threadEngine)
108 :threadEngine(_threadEngine)
109 {
110
111 }
112
113 void reserveSpace(int resultCount)
114 {
115 currentResultCount = resultCount;
116 vector.resize(qMax(resultCount, vector.count()));
117 }
118
119 void reportResults(int begin)
120 {
121 const int useVectorThreshold = 4; // Tunable parameter.
122 if (currentResultCount > useVectorThreshold) {
123 vector.resize(currentResultCount);
124 threadEngine->reportResults(vector, begin);
125 } else {
126 for (int i = 0; i < currentResultCount; ++i)
127 threadEngine->reportResult(&vector.at(i), begin + i);
128 }
129 }
130
131 inline T * getPointer()
132 {
133 return vector.data();
134 }
135
136 int currentResultCount;
137 ThreadEngine<T> *threadEngine;
138 QVector<T> vector;
139};
140
141template <>
142class ResultReporter<void>
143{
144public:
145 inline ResultReporter(ThreadEngine<void> *) { }
146 inline void reserveSpace(int) { };
147 inline void reportResults(int) { };
148 inline void * getPointer() { return 0; }
149};
150
151inline bool selectIteration(std::bidirectional_iterator_tag)
152{
153 return false; // while
154}
155
156inline bool selectIteration(std::forward_iterator_tag)
157{
158 return false; // while
159}
160
161inline bool selectIteration(std::random_access_iterator_tag)
162{
163 return true; // for
164}
165
166template <typename Iterator, typename T>
167class IterateKernel : public ThreadEngine<T>
168{
169public:
170 typedef T ResultType;
171
172 IterateKernel(Iterator _begin, Iterator _end)
173#ifndef QT_NO_PARTIAL_TEMPLATE_SPECIALIZATION
174 : begin(_begin), end(_end), current(_begin), currentIndex(0),
175 forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category())), progressReportingEnabled(true)
176#else
177 : begin(_begin), end(_end), currentIndex(0),
178 forIteration(selectIteration(std::iterator_category(_begin))), progressReportingEnabled(true)
179#endif
180 {
181 iterationCount = forIteration ? std::distance(_begin, _end) : 0;
182 }
183
184 virtual ~IterateKernel() { }
185
186 virtual bool runIteration(Iterator it, int index , T *result)
187 { Q_UNUSED(it); Q_UNUSED(index); Q_UNUSED(result); return false; }
188 virtual bool runIterations(Iterator _begin, int beginIndex, int endIndex, T *results)
189 { Q_UNUSED(_begin); Q_UNUSED(beginIndex); Q_UNUSED(endIndex); Q_UNUSED(results); return false; }
190
191 void start()
192 {
193 progressReportingEnabled = this->isProgressReportingEnabled();
194 if (progressReportingEnabled && iterationCount > 0)
195 this->setProgressRange(0, iterationCount);
196 }
197
198 bool shouldStartThread()
199 {
200 if (forIteration)
201 return (currentIndex < iterationCount) && !this->shouldThrottleThread();
202 else // whileIteration
203 return (iteratorThreads == 0);
204 }
205
206 ThreadFunctionResult threadFunction()
207 {
208 if (forIteration)
209 return this->forThreadFunction();
210 else // whileIteration
211 return this->whileThreadFunction();
212 }
213
214 ThreadFunctionResult forThreadFunction()
215 {
216 BlockSizeManager blockSizeManager(iterationCount);
217 ResultReporter<T> resultReporter(this);
218
219 for(;;) {
220 if (this->isCanceled())
221 break;
222
223 const int currentBlockSize = blockSizeManager.blockSize();
224
225 if (currentIndex >= iterationCount)
226 break;
227
228 // Atomically reserve a block of iterationCount for this thread.
229 const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
230 const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
231
232 if (beginIndex >= endIndex) {
233 // No more work
234 break;
235 }
236
237 this->waitForResume(); // (only waits if the qfuture is paused.)
238
239 if (shouldStartThread())
240 this->startThread();
241
242 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
243 resultReporter.reserveSpace(finalBlockSize);
244
245 // Call user code with the current iteration range.
246 blockSizeManager.timeBeforeUser();
247 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
248 blockSizeManager.timeAfterUser();
249
250 if (resultsAvailable)
251 resultReporter.reportResults(beginIndex);
252
253 // Report progress if progress reporting enabled.
254 if (progressReportingEnabled) {
255 completed.fetchAndAddAcquire(finalBlockSize);
256 this->setProgressValue(this->completed);
257 }
258
259 if (this->shouldThrottleThread())
260 return ThrottleThread;
261 }
262 return ThreadFinished;
263 }
264
265 ThreadFunctionResult whileThreadFunction()
266 {
267 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
268 return ThreadFinished;
269
270 ResultReporter<T> resultReporter(this);
271 resultReporter.reserveSpace(1);
272
273 while (current != end) {
274 // The following two lines breaks support for input iterators according to
275 // the sgi docs: dereferencing prev after calling ++current is not allowed
276 // on input iterators. (prev is dereferenced inside user.runIteration())
277 Iterator prev = current;
278 ++current;
279 int index = currentIndex.fetchAndAddRelaxed(1);
280 iteratorThreads.testAndSetRelease(1, 0);
281
282 this->waitForResume(); // (only waits if the qfuture is paused.)
283
284 if (shouldStartThread())