source: trunk/src/corelib/concurrent/qfutureinterface.cpp@ 643

Last change on this file since 643 was 561, checked in by Dmitry A. Kuminov, 15 years ago

trunk: Merged in qt 4.6.1 sources.

File size: 16.7 KB
Line 
1/****************************************************************************
2**
3** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies).
4** All rights reserved.
5** Contact: Nokia Corporation ([email protected])
6**
7** This file is part of the QtCore module of the Qt Toolkit.
8**
9** $QT_BEGIN_LICENSE:LGPL$
10** Commercial Usage
11** Licensees holding valid Qt Commercial licenses may use this file in
12** accordance with the Qt Commercial License Agreement provided with the
13** Software or, alternatively, in accordance with the terms contained in
14** a written agreement between you and Nokia.
15**
16** GNU Lesser General Public License Usage
17** Alternatively, this file may be used under the terms of the GNU Lesser
18** General Public License version 2.1 as published by the Free Software
19** Foundation and appearing in the file LICENSE.LGPL included in the
20** packaging of this file. Please review the following information to
21** ensure the GNU Lesser General Public License version 2.1 requirements
22** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
23**
24** In addition, as a special exception, Nokia gives you certain additional
25** rights. These rights are described in the Nokia Qt LGPL Exception
26** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
27**
28** GNU General Public License Usage
29** Alternatively, this file may be used under the terms of the GNU
30** General Public License version 3.0 as published by the Free Software
31** Foundation and appearing in the file LICENSE.GPL included in the
32** packaging of this file. Please review the following information to
33** ensure the GNU General Public License version 3.0 requirements will be
34** met: http://www.gnu.org/copyleft/gpl.html.
35**
36** If you have questions regarding the use of this file, please contact
37** Nokia at [email protected].
38** $QT_END_LICENSE$
39**
40****************************************************************************/
41
42// qfutureinterface.h included from qfuture.h
43#include "qfuture.h"
44
45#ifndef QT_NO_QFUTURE
46
47#include <QtCore/qatomic.h>
48#include <QtCore/qthread.h>
49#include <QtCore/qthreadpool.h>
50#include <private/qthreadpool_p.h>
51
52#include "qfutureinterface_p.h"
53
54QT_BEGIN_NAMESPACE
55
56enum {
57 MaxProgressEmitsPerSecond = 25
58};
59
60QFutureInterfaceBase::QFutureInterfaceBase(State initialState)
61 : d(new QFutureInterfaceBasePrivate(initialState))
62{ }
63
64QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other)
65 : d(other.d)
66{
67 d->refCount.ref();
68}
69
70QFutureInterfaceBase::~QFutureInterfaceBase()
71{
72 if (!d->refCount.deref())
73 delete d;
74}
75
76void QFutureInterfaceBase::cancel()
77{
78 QMutexLocker locker(&d->m_mutex);
79 if (d->state & Canceled)
80 return;
81
82 d->state = State((d->state & ~Paused) | Canceled);
83 d->waitCondition.wakeAll();
84 d->pausedWaitCondition.wakeAll();
85 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
86}
87
88void QFutureInterfaceBase::setPaused(bool paused)
89{
90 QMutexLocker locker(&d->m_mutex);
91 if (paused) {
92 d->state = State(d->state | Paused);
93 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
94 } else {
95 d->state = State(d->state & ~Paused);
96 d->pausedWaitCondition.wakeAll();
97 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
98 }
99}
100
101void QFutureInterfaceBase::togglePaused()
102{
103 QMutexLocker locker(&d->m_mutex);
104 if (d->state & Paused) {
105 d->state = State(d->state & ~Paused);
106 d->pausedWaitCondition.wakeAll();
107 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
108 } else {
109 d->state = State(d->state | Paused);
110 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
111 }
112}
113
114void QFutureInterfaceBase::setThrottled(bool enable)
115{
116 // bail out if we are not changing the state
117 if ((enable && (d->state & Throttled)) || (!enable && !(d->state & Throttled)))
118 return;
119
120 // lock and change the state
121 QMutexLocker lock(&d->m_mutex);
122 if (enable) {
123 d->state = State(d->state | Throttled);
124 } else {
125 d->state = State(d->state & ~Throttled);
126 if (!(d->state & Paused))
127 d->pausedWaitCondition.wakeAll();
128 }
129}
130
131
132bool QFutureInterfaceBase::isRunning() const
133{
134 return queryState(Running);
135}
136
137bool QFutureInterfaceBase::isStarted() const
138{
139 return queryState(Started);
140}
141
142bool QFutureInterfaceBase::isCanceled() const
143{
144 return queryState(Canceled);
145}
146
147bool QFutureInterfaceBase::isFinished() const
148{
149 return queryState(Finished);
150}
151
152bool QFutureInterfaceBase::isPaused() const
153{
154 return queryState(Paused);
155}
156
157bool QFutureInterfaceBase::isThrottled() const
158{
159 return queryState(Throttled);
160}
161
162bool QFutureInterfaceBase::isResultReadyAt(int index) const
163{
164 QMutexLocker lock(&d->m_mutex);
165 return d->internal_isResultReadyAt(index);
166}
167
168bool QFutureInterfaceBase::waitForNextResult()
169{
170 QMutexLocker lock(&d->m_mutex);
171 return d->internal_waitForNextResult();
172}
173
174void QFutureInterfaceBase::waitForResume()
175{
176 // return early if possible to avoid taking the mutex lock.
177 if ((d->state & Paused) == false || (d->state & Canceled))
178 return;
179
180 QMutexLocker lock(&d->m_mutex);
181 if ((d->state & Paused) == false || (d->state & Canceled))
182 return;
183
184 // decrease active thread count since this thread will wait.
185 QThreadPool::globalInstance()->releaseThread();
186
187 d->pausedWaitCondition.wait(&d->m_mutex);
188
189 QThreadPool::globalInstance()->reserveThread();
190}
191
192int QFutureInterfaceBase::progressValue() const
193{
194 return d->m_progressValue;
195}
196
197int QFutureInterfaceBase::progressMinimum() const
198{
199 return d->m_progressMinimum;
200}
201
202int QFutureInterfaceBase::progressMaximum() const
203{
204 return d->m_progressMaximum;
205}
206
207int QFutureInterfaceBase::resultCount() const
208{
209 QMutexLocker lock(&d->m_mutex);
210 return d->internal_resultCount();
211}
212
213QString QFutureInterfaceBase::progressText() const
214{
215 QMutexLocker locker(&d->m_mutex);
216 return d->m_progressText;
217}
218
219bool QFutureInterfaceBase::isProgressUpdateNeeded() const
220{
221 QMutexLocker locker(&d->m_mutex);
222 return (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond));
223}
224
225void QFutureInterfaceBase::reportStarted()
226{
227 QMutexLocker locker(&d->m_mutex);
228 if ((d->state & Started) || (d->state & Canceled) || (d->state & Finished))
229 return;
230
231 d->setState(State(Started | Running));
232 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started));
233}
234
235void QFutureInterfaceBase::reportCanceled()
236{
237 cancel();
238}
239
240#ifndef QT_NO_EXCEPTIONS
241void QFutureInterfaceBase::reportException(const QtConcurrent::Exception &exception)
242{
243 QMutexLocker locker(&d->m_mutex);
244 if ((d->state & Canceled) || (d->state & Finished))
245 return;
246
247 d->m_exceptionStore.setException(exception);
248 d->state = State(d->state | Canceled);
249 d->waitCondition.wakeAll();
250 d->pausedWaitCondition.wakeAll();
251 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
252}
253#endif
254
255void QFutureInterfaceBase::reportFinished()
256{
257 QMutexLocker locker(&d->m_mutex);
258 if (!(d->state & Finished)) {
259 d->state = State((d->state & ~Running) | Finished);
260 d->waitCondition.wakeAll();
261 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
262 }
263}
264
265void QFutureInterfaceBase::setExpectedResultCount(int resultCount)
266{
267 if (d->manualProgress == false)
268 setProgressRange(0, resultCount);
269 d->m_expectedResultCount = resultCount;
270}
271
272int QFutureInterfaceBase::expectedResultCount()
273{
274 return d->m_expectedResultCount;
275}
276
277bool QFutureInterfaceBase::queryState(State state) const
278{
279 return (d->state & state);
280}
281
282void QFutureInterfaceBase::waitForResult(int resultIndex)
283{
284 d->m_exceptionStore.throwPossibleException();
285
286 if (!(d->state & Running))
287 return;
288
289 // To avoid deadlocks and reduce the number of threads used, try to
290 // run the runnable in the current thread.
291 QThreadPool::globalInstance()->d_func()->stealRunnable(d->runnable);
292
293 QMutexLocker lock(&d->m_mutex);
294
295 if (!(d->state & Running))
296 return;
297
298 const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
299 while ((d->state & Running) && d->internal_isResultReadyAt(waitIndex) == false)
300 d->waitCondition.wait(&d->m_mutex);
301
302 d->m_exceptionStore.throwPossibleException();
303}
304
305void QFutureInterfaceBase::waitForFinished()
306{
307 if (d->state & Running) {
308 QThreadPool::globalInstance()->d_func()->stealRunnable(d->runnable);
309
310 QMutexLocker lock(&d->m_mutex);
311
312 while (d->state & Running)
313 d->waitCondition.wait(&d->m_mutex);
314 }
315
316 d->m_exceptionStore.throwPossibleException();
317}
318
319void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex)
320{
321 if ((d->state & Canceled) || (d->state & Finished) || beginIndex == endIndex)
322 return;
323
324 d->waitCondition.wakeAll();
325
326 if (d->manualProgress == false) {
327 if (d->internal_updateProgress(d->m_progressValue + endIndex - beginIndex) == false) {
328 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
329 beginIndex,
330 endIndex));
331 return;
332 }
333
334 d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
335 d->m_progressValue,
336 d->m_progressText),
337 QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
338 beginIndex,
339 endIndex));
340 return;
341 }
342 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex));
343}
344
345void QFutureInterfaceBase::setRunnable(QRunnable *runnable)
346{
347 d->runnable = runnable;
348}
349
350void QFutureInterfaceBase::setFilterMode(bool enable)
351{
352 QMutexLocker locker(&d->m_mutex);
353 resultStoreBase().setFilterMode(enable);
354}
355
356void QFutureInterfaceBase::setProgressRange(int minimum, int maximum)
357{
358 QMutexLocker locker(&d->m_mutex);
359 d->m_progressMinimum = minimum;
360 d->m_progressMaximum = maximum;
361 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum));
362}
363
364void QFutureInterfaceBase::setProgressValue(int progressValue)
365{
366 setProgressValueAndText(progressValue, QString());
367}
368
369void QFutureInterfaceBase::setProgressValueAndText(int progressValue,
370 const QString &progressText)
371{
372 QMutexLocker locker(&d->m_mutex);
373 if (d->manualProgress == false)
374 d->manualProgress = true;
375 if (d->m_progressValue >= progressValue)
376 return;
377
378 if ((d->state & Canceled) || (d->state & Finished))
379 return;
380
381 if (d->internal_updateProgress(progressValue, progressText)) {
382 d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
383 d->m_progressValue,
384 d->m_progressText));
385 }
386}
387
388QMutex *QFutureInterfaceBase::mutex() const
389{
390 return &d->m_mutex;
391}
392
393QtConcurrent::internal::ExceptionStore &QFutureInterfaceBase::exceptionStore()
394{
395 return d->m_exceptionStore;
396}
397
398QtConcurrent::ResultStoreBase &QFutureInterfaceBase::resultStoreBase()
399{
400 return d->m_results;
401}
402
403const QtConcurrent::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const
404{
405 return d->m_results;
406}
407
408QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other)
409{
410 other.d->refCount.ref();
411 if (!d->refCount.deref())
412 delete d;
413 d = other.d;
414 return *this;
415}
416
417bool QFutureInterfaceBase::referenceCountIsOne() const
418{
419 return d->refCount == 1;
420}
421
422QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState)
423 : refCount(1), m_progressValue(0), m_progressMinimum(0), m_progressMaximum(0),
424 state(initialState), progressTimeStarted(false), pendingResults(0),
425 manualProgress(false), m_expectedResultCount(0), runnable(0)
426{ }
427
428int QFutureInterfaceBasePrivate::internal_resultCount() const
429{
430 return m_results.count(); // ### subtract canceled results.
431}
432
433bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const
434{
435 return (m_results.contains(index));
436}
437
438bool QFutureInterfaceBasePrivate::internal_waitForNextResult()
439{
440 if (m_results.hasNextResult())
441 return true;
442
443 while ((state & QFutureInterfaceBase::Running) && m_results.hasNextResult() == false)
444 waitCondition.wait(&m_mutex);
445
446 return (!(state & QFutureInterfaceBase::Canceled) && m_results.hasNextResult());
447}
448
449bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress,
450 const QString &progressText)
451{
452 if (m_progressValue >= progress)
453 return false;
454
455 m_progressValue = progress;
456 m_progressText = progressText;
457
458 if (progressTimeStarted == true && m_progressValue != m_progressMaximum) // make sure the first and last steps are emitted.
459 if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond))
460 return false;
461
462 progressTime.start();
463 progressTimeStarted = true;
464 return true;
465}
466
467void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable)
468{
469 // bail out if we are not changing the state
470 if ((enable && (state & QFutureInterfaceBase::Throttled))
471 || (!enable && !(state & QFutureInterfaceBase::Throttled)))
472 return;
473
474 // change the state
475 if (enable) {
476 state = QFutureInterfaceBase::State(state | QFutureInterfaceBase::Throttled);
477 } else {
478 state = QFutureInterfaceBase::State(state & ~QFutureInterfaceBase::Throttled);
479 if (!(state & QFutureInterfaceBase::Paused))
480 pausedWaitCondition.wakeAll();
481 }
482}
483
484void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent)
485{
486 if (outputConnections.isEmpty())
487 return;
488
489 for (int i = 0; i < outputConnections.count(); ++i)
490 outputConnections.at(i)->postCallOutEvent(callOutEvent);
491}
492
493void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1,
494 const QFutureCallOutEvent &callOutEvent2)
495{
496 if (outputConnections.isEmpty())
497 return;
498
499 for (int i = 0; i < outputConnections.count(); ++i) {
500 QFutureCallOutInterface *interface = outputConnections.at(i);
501 interface->postCallOutEvent(callOutEvent1);
502 interface->postCallOutEvent(callOutEvent2);
503 }
504}
505
506// This function connects an output interface (for example a QFutureWatcher)
507// to this future. While holding the lock we check the state and ready results
508// and add the appropriate callouts to the queue. In order to avoid deadlocks,
509// the actual callouts are made at the end while not holding the lock.
510void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface)
511{
512 QMutexLocker locker(&m_mutex);
513
514 if (state & QFutureInterfaceBase::Started) {
515 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
516 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
517 m_progressMinimum,
518 m_progressMaximum));
519 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
520 m_progressValue,
521 m_progressText));
522 }
523
524 QtConcurrent::ResultIteratorBase it = m_results.begin();
525 while (it != m_results.end()) {
526 const int begin = it.resultIndex();
527 const int end = begin + it.batchSize();
528 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
529 begin,
530 end));
531 it.batchedAdvance();
532 }
533
534 if (state & QFutureInterfaceBase::Paused)
535 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
536
537 if (state & QFutureInterfaceBase::Canceled)
538 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
539
540 if (state & QFutureInterfaceBase::Finished)
541 interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
542
543 outputConnections.append(interface);
544}
545
546void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface)
547{
548 QMutexLocker lock(&m_mutex);
549 const int index = outputConnections.indexOf(interface);
550 if (index == -1)
551 return;
552 outputConnections.removeAt(index);
553
554 interface->callOutInterfaceDisconnected();
555}
556
557void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
558{
559 state = newState;
560}
561
562QT_END_NAMESPACE
563
564#endif // QT_NO_CONCURRENT
Note: See TracBrowser for help on using the repository browser.