Ignore:
Timestamp:
Sep 30, 2009, 6:08:53 AM (16 years ago)
Author:
Dmitry A. Kuminov
Message:

corelib: Improved socket notifiers: don't block other threads by constantly posting "socket ready" messages to the message queue in cases when the target thread isn't fast enough to process them. This fixes spontaneous various odd behavior (partial lines, unexpected child termination due to EPIPE) when redirecting child process output in QProcess.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/corelib/kernel/qeventdispatcher_pm.cpp

    r175 r196  
    284284    static void addSelect(QSocketNotifier *notifier, HWND hwnd);
    285285    static void removeSelect(QSocketNotifier *notifier);
    286     static QSocketNotifier *getSocketNotifier(int key);
     286    static QSocketNotifier *getSocketNotifier(int key);
    287287
    288288    // timer support is based on QTimerInfoList from the Unix implementation
     
    300300    ~QSelectThread();
    301301
     302
     303
     304
     305
    302306    void run();
    303307    void cancelSelectOrIdle();
     
    322326    // socket stuff
    323327
    324     typedef QHash<int, QSocketNotifier*> Sockets;
     328    typedef QPair<QSocketNotifier*, HWND> SockNot;
     329    typedef QHash<int, SockNot> Sockets;
    325330    Sockets sockets;
     331
     332
    326333    int maxSockfd;
    327 
    328     enum Op { Add, Remove };
    329     struct PendingSockOp {
    330         Op op;
    331         int sockfd;
    332         Type type;
    333         HWND hwnd;
    334     };
    335 
    336     typedef QList<PendingSockOp> PendingSockets;
    337     PendingSockets pendingSockets;
    338334
    339335    // timer stuff
     
    402398
    403399    }
    404     instance->sockets.insert(key, notifier);
    405     PendingSockOp op = {Add, sockfd, type, hwnd};
    406     instance->pendingSockets.append(op);
     400    instance->sockets.insert(key, );
     401    ;
     402    instance->);
    407403    instance->cancelSelectOrIdle();
    408404}
     
    420416    if (instance->sockets.contains(key)) {
    421417        instance->sockets.remove(key);
    422         PendingSockOp op = {Remove, sockfd, type};
    423         instance->pendingSockets.append(op);
     418        ;
     419        instance->);
    424420        instance->cancelSelectOrIdle();
    425421    }
     
    436432*/
    437433// static
    438 QSocketNotifier *QSelectThread::getSocketNotifier(int key)
     434QSocketNotifier *QSelectThread::getSocketNotifier(int key)
    439435{
    440436    QMutexLocker locker(&mutex);
    441437    Q_ASSERT(instance);
    442438
    443     if (instance->sockets.contains(key) &&
    444         instance->sockets[key]->thread() == QThread::currentThread())
    445         return instance->sockets[key];
     439    if (instance->sockets.contains(key)) {
     440        QSocketNotifier* notifier = instance->sockets[key].first;
     441        if (notifier->thread() == QThread::currentThread()) {
     442            if (reset && notifier->isEnabled()) {
     443                // add the socket back to the set
     444                int sockfd = notifier->socket();
     445                fd_set *set = instance->setForType(notifier->type());
     446                FD_SET(sockfd, set);
     447                instance->updateMaxSockFd(sockfd, Add);
     448                // inform the select thread that this socket may be included
     449                // in the set and posted again
     450                instance->cancelSelectOrIdle();
     451            }
     452            return notifier;
     453        }
     454    }
    446455
    447456    return 0;
     
    577586            } else {
    578587                // also wake it up if this timer was skipped when choosing the
    579                 // shortest wait interval so that a longer once could be chosen
     588                // shortest wait interval so that a longer one could be chosen
    580589                bool haveNonPosted = false;
    581590                for (TimevalMap::const_iterator it = instance->timersByTimeout.begin();
     
    647656    for (Sockets::iterator it = instance->sockets.begin();
    648657          it != instance->sockets.end();) {
    649         QSocketNotifier *notifier = it.value();
     658        QSocketNotifier *notifier = it.value();
    650659        if (notifier->thread() == QThread::currentThread()) {
    651             PendingSockOp op = {Remove, notifier->socket(), notifier->type()};
    652             instance->pendingSockets.append(op);
    653660            it = instance->sockets.erase(it);
     661
     662
    654663        } else {
    655664            ++it;
     
    694703{
    695704    // initialize socket stuff
     705
     706
     707
    696708    maxSockfd = -1;
    697709
     
    720732}
    721733
     734
     735
     736
     737
     738
     739
     740
     741
     742
     743
     744
     745
     746
     747
     748
     749
     750
     751
     752
     753
     754
     755
     756
     757
     758
     759
     760
     761
     762
     763
     764
     765
     766
     767
     768
     769
     770
    722771void QSelectThread::run()
    723772{
    724     // maintain a separate hash for HWNDs to avoid mutex locking every time
    725     // select() returns an event that we want to post
    726     typedef QHash<int, HWND> Hwnds;
    727     Hwnds hwnds;
    728 
    729     fd_set readS, writeS, exS;
    730     FD_ZERO(&readS);
    731     FD_ZERO(&writeS);
    732     FD_ZERO(&exS);
    733 
    734773    mutex.lock();
    735774
    736775    do {
    737         // process pending socket operations
    738         while (!pendingSockets.isEmpty()) {
    739             PendingSockOp p = pendingSockets.takeFirst();
    740             switch (p.op) {
    741                 case Add:
    742                     switch (p.type) {
    743                         case QSocketNotifier::Read:
    744                             FD_SET(p.sockfd, &readS); break;
    745                         case QSocketNotifier::Write:
    746                             FD_SET(p.sockfd, &writeS); break;
    747                         case QSocketNotifier::Exception:
    748                             FD_SET(p.sockfd, &exS); break;
    749                     }
    750                     hwnds.insert(toSockKey(p.sockfd, p.type), p.hwnd);
    751                     maxSockfd = qMax(maxSockfd, p.sockfd);
    752                     break;
    753                 case Remove:
    754                     switch (p.type) {
    755                         case QSocketNotifier::Read:
    756                             FD_CLR(p.sockfd, &readS); break;
    757                         case QSocketNotifier::Write:
    758                             FD_CLR(p.sockfd, &writeS); break;
    759                         case QSocketNotifier::Exception:
    760                             FD_CLR(p.sockfd, &exS); break;
    761                     }
    762                     hwnds.remove(toSockKey(p.sockfd, p.type));
    763                     if (maxSockfd == p.sockfd) {
    764                         // find the new hignest socket
    765                         maxSockfd = -1;
    766                         if (!hwnds.isEmpty()) {
    767                             for (Hwnds::const_iterator it = hwnds.constBegin();
    768                                   it != hwnds.constEnd(); ++it) {
    769                                 maxSockfd = qMax(toSocket(it.key()), maxSockfd);
    770                             }
    771                         }
    772                     }
    773                     break;
    774             }
    775         }
    776 
    777776        // get the maximum time we can wait (for the closest timer)
    778777        timeval *timeout = 0;
     
    787786            mutex.unlock();
    788787            nsel = ::select(maxSockfd + 1, &tmpRead, &tmpWrite, &tmpEx, timeout);
     788
    789789            if (nsel > 0) {
    790                 for (Hwnds::const_iterator it = hwnds.constBegin();
    791                       it != hwnds.constEnd(); ++it) {
     790                // find out which sockets to post. Note that we remove these
     791                // sockets from the main sets to avoid polluting the message
     792                // queue with sockett messages if the target window is not fast
     793                // enough to process them. They will be put back once processed.
     794                for (Sockets::const_iterator it = sockets.constBegin();
     795                      it != sockets.constEnd(); ++it) {
    792796                    int sockfd = toSocket(it.key());
     797
    793798                    bool isSet = false;
    794                     switch (toSockType(it.key())) {
     799                    switch (t) {
    795800                        case QSocketNotifier::Read:
    796801                            isSet = FD_ISSET(sockfd, &tmpRead); break;
     
    800805                            isSet = FD_ISSET(sockfd, &tmpEx); break;
    801806                    }
    802                     if (isSet)
    803                         WinPostMsg(it.value(), WM_U_SEM_SELECT, MPFROMLONG(it.key()), 0);
     807                    if (isSet) {
     808                        fd_set *set = setForType(type);
     809                        FD_CLR(sockfd, set);
     810                        updateMaxSockFd(sockfd, Remove);
     811                        WinPostMsg(it.value().second, WM_U_SEM_SELECT, MPFROMLONG(it.key()), 0);
     812                    }
    804813                }
    805814            }
    806             mutex.lock();
    807815        } else {
    808816            nsel = -1;
     
    10351043    }
    10361044    static void removeSelect(QSocketNotifier *notifier) {}
    1037     static QSocketNotifier *getSocketNotifier(int key); { return 0; }
     1045    static QSocketNotifier *getSocketNotifier(int key); { return 0; }
    10381046
    10391047    static void addTimer(int timerId, int interval, QObject *object, HWND hwnd) {
     
    11471155        case WM_U_SEM_SELECT: {
    11481156            QSocketNotifier *notifier =
    1149                 QSelectThread::getSocketNotifier(LONGFROMMP(mp1));
     1157                QSelectThread::getSocketNotifier(LONGFROMMP(mp1));
    11501158            if (notifier) {
    11511159                QEvent event(QEvent::SockAct);
Note: See TracChangeset for help on using the changeset viewer.