source: trunk/examples/network/torrent/peerwireclient.cpp@ 561

Last change on this file since 561 was 561, checked in by Dmitry A. Kuminov, 15 years ago

trunk: Merged in qt 4.6.1 sources.

File size: 20.9 KB
Line 
1/****************************************************************************
2**
3** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies).
4** All rights reserved.
5** Contact: Nokia Corporation ([email protected])
6**
7** This file is part of the examples of the Qt Toolkit.
8**
9** $QT_BEGIN_LICENSE:LGPL$
10** Commercial Usage
11** Licensees holding valid Qt Commercial licenses may use this file in
12** accordance with the Qt Commercial License Agreement provided with the
13** Software or, alternatively, in accordance with the terms contained in
14** a written agreement between you and Nokia.
15**
16** GNU Lesser General Public License Usage
17** Alternatively, this file may be used under the terms of the GNU Lesser
18** General Public License version 2.1 as published by the Free Software
19** Foundation and appearing in the file LICENSE.LGPL included in the
20** packaging of this file. Please review the following information to
21** ensure the GNU Lesser General Public License version 2.1 requirements
22** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
23**
24** In addition, as a special exception, Nokia gives you certain additional
25** rights. These rights are described in the Nokia Qt LGPL Exception
26** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
27**
28** GNU General Public License Usage
29** Alternatively, this file may be used under the terms of the GNU
30** General Public License version 3.0 as published by the Free Software
31** Foundation and appearing in the file LICENSE.GPL included in the
32** packaging of this file. Please review the following information to
33** ensure the GNU General Public License version 3.0 requirements will be
34** met: http://www.gnu.org/copyleft/gpl.html.
35**
36** If you have questions regarding the use of this file, please contact
37** Nokia at [email protected].
38** $QT_END_LICENSE$
39**
40****************************************************************************/
41
42#include "peerwireclient.h"
43
44#include <QHostAddress>
45#include <QTimerEvent>
46
47static const int PendingRequestTimeout = 60 * 1000;
48static const int ClientTimeout = 120 * 1000;
49static const int ConnectTimeout = 60 * 1000;
50static const int KeepAliveInterval = 30 * 1000;
51static const int RateControlTimerDelay = 2000;
52static const int MinimalHeaderSize = 48;
53static const int FullHeaderSize = 68;
54static const char ProtocolId[] = "BitTorrent protocol";
55static const char ProtocolIdSize = 19;
56
57// Reads a 32bit unsigned int from data in network order.
58static inline quint32 fromNetworkData(const char *data)
59{
60 const unsigned char *udata = (const unsigned char *)data;
61 return (quint32(udata[0]) << 24)
62 | (quint32(udata[1]) << 16)
63 | (quint32(udata[2]) << 8)
64 | (quint32(udata[3]));
65}
66
67// Writes a 32bit unsigned int from num to data in network order.
68static inline void toNetworkData(quint32 num, char *data)
69{
70 unsigned char *udata = (unsigned char *)data;
71 udata[3] = (num & 0xff);
72 udata[2] = (num & 0xff00) >> 8;
73 udata[1] = (num & 0xff0000) >> 16;
74 udata[0] = (num & 0xff000000) >> 24;
75}
76
77// Constructs an unconnected PeerWire client and starts the connect timer.
78PeerWireClient::PeerWireClient(const QByteArray &peerId, QObject *parent)
79 : QTcpSocket(parent), pendingBlockSizes(0),
80 pwState(ChokingPeer | ChokedByPeer), receivedHandShake(false), gotPeerId(false),
81 sentHandShake(false), nextPacketLength(-1), pendingRequestTimer(0), invalidateTimeout(false),
82 keepAliveTimer(0), torrentPeer(0)
83{
84 memset(uploadSpeedData, 0, sizeof(uploadSpeedData));
85 memset(downloadSpeedData, 0, sizeof(downloadSpeedData));
86
87 transferSpeedTimer = startTimer(RateControlTimerDelay);
88 timeoutTimer = startTimer(ConnectTimeout);
89 peerIdString = peerId;
90
91 connect(this, SIGNAL(readyRead()), this, SIGNAL(readyToTransfer()));
92 connect(this, SIGNAL(connected()), this, SIGNAL(readyToTransfer()));
93
94 connect(&socket, SIGNAL(connected()),
95 this, SIGNAL(connected()));
96 connect(&socket, SIGNAL(readyRead()),
97 this, SIGNAL(readyRead()));
98 connect(&socket, SIGNAL(disconnected()),
99 this, SIGNAL(disconnected()));
100 connect(&socket, SIGNAL(error(QAbstractSocket::SocketError)),
101 this, SIGNAL(error(QAbstractSocket::SocketError)));
102 connect(&socket, SIGNAL(bytesWritten(qint64)),
103 this, SIGNAL(bytesWritten(qint64)));
104 connect(&socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
105 this, SLOT(socketStateChanged(QAbstractSocket::SocketState)));
106
107}
108
109// Registers the peer ID and SHA1 sum of the torrent, and initiates
110// the handshake.
111void PeerWireClient::initialize(const QByteArray &infoHash, int pieceCount)
112{
113 this->infoHash = infoHash;
114 peerPieces.resize(pieceCount);
115 if (!sentHandShake)
116 sendHandShake();
117}
118
119void PeerWireClient::setPeer(TorrentPeer *peer)
120{
121 torrentPeer = peer;
122}
123
124TorrentPeer *PeerWireClient::peer() const
125{
126 return torrentPeer;
127}
128
129QBitArray PeerWireClient::availablePieces() const
130{
131 return peerPieces;
132}
133
134QList<TorrentBlock> PeerWireClient::incomingBlocks() const
135{
136 return incoming;
137}
138
139// Sends a "choke" message, asking the peer to stop requesting blocks.
140void PeerWireClient::chokePeer()
141{
142 const char message[] = {0, 0, 0, 1, 0};
143 write(message, sizeof(message));
144 pwState |= ChokingPeer;
145
146 // After receiving a choke message, the peer will assume all
147 // pending requests are lost.
148 pendingBlocks.clear();
149 pendingBlockSizes = 0;
150}
151
152// Sends an "unchoke" message, allowing the peer to start/resume
153// requesting blocks.
154void PeerWireClient::unchokePeer()
155{
156 const char message[] = {0, 0, 0, 1, 1};
157 write(message, sizeof(message));
158 pwState &= ~ChokingPeer;
159
160 if (pendingRequestTimer)
161 killTimer(pendingRequestTimer);
162}
163
164// Sends a "keep-alive" message to prevent the peer from closing
165// the connection when there's no activity
166void PeerWireClient::sendKeepAlive()
167{
168 const char message[] = {0, 0, 0, 0};
169 write(message, sizeof(message));
170}
171
172// Sends an "interested" message, informing the peer that it has got
173// pieces that we'd like to download.
174void PeerWireClient::sendInterested()
175{
176 const char message[] = {0, 0, 0, 1, 2};
177 write(message, sizeof(message));
178 pwState |= InterestedInPeer;
179
180 // After telling the peer that we're interested, we expect to get
181 // unchoked within a certain timeframe; otherwise we'll drop the
182 // connection.
183 if (pendingRequestTimer)
184 killTimer(pendingRequestTimer);
185 pendingRequestTimer = startTimer(PendingRequestTimeout);
186}
187
188// Sends a "not interested" message, informing the peer that it does
189// not have any pieces that we'd like to download.
190void PeerWireClient::sendNotInterested()
191{
192 const char message[] = {0, 0, 0, 1, 3};
193 write(message, sizeof(message));
194 pwState &= ~InterestedInPeer;
195}
196
197// Sends a piece notification / a "have" message, informing the peer
198// that we have just downloaded a new piece.
199void PeerWireClient::sendPieceNotification(int piece)
200{
201 if (!sentHandShake)
202 sendHandShake();
203
204 char message[] = {0, 0, 0, 5, 4, 0, 0, 0, 0};
205 toNetworkData(piece, &message[5]);
206 write(message, sizeof(message));
207}
208
209// Sends the complete list of pieces that we have downloaded.
210void PeerWireClient::sendPieceList(const QBitArray &bitField)
211{
212 // The bitfield message may only be sent immediately after the
213 // handshaking sequence is completed, and before any other
214 // messages are sent.
215 if (!sentHandShake)
216 sendHandShake();
217
218 // Don't send the bitfield if it's all zeros.
219 if (bitField.count(true) == 0)
220 return;
221
222 int bitFieldSize = bitField.size();
223 int size = (bitFieldSize + 7) / 8;
224 QByteArray bits(size, '\0');
225 for (int i = 0; i < bitFieldSize; ++i) {
226 if (bitField.testBit(i)) {
227 quint32 byte = quint32(i) / 8;
228 quint32 bit = quint32(i) % 8;
229 bits[byte] = uchar(bits.at(byte)) | (1 << (7 - bit));
230 }
231 }
232
233 char message[] = {0, 0, 0, 1, 5};
234 toNetworkData(bits.size() + 1, &message[0]);
235 write(message, sizeof(message));
236 write(bits);
237}
238
239// Sends a request for a block.
240void PeerWireClient::requestBlock(int piece, int offset, int length)
241{
242 char message[] = {0, 0, 0, 1, 6};
243 toNetworkData(13, &message[0]);
244 write(message, sizeof(message));
245
246 char numbers[4 * 3];
247 toNetworkData(piece, &numbers[0]);
248 toNetworkData(offset, &numbers[4]);
249 toNetworkData(length, &numbers[8]);
250 write(numbers, sizeof(numbers));
251
252 incoming << TorrentBlock(piece, offset, length);
253
254 // After requesting a block, we expect the block to be sent by the
255 // other peer within a certain number of seconds. Otherwise, we
256 // drop the connection.
257 if (pendingRequestTimer)
258 killTimer(pendingRequestTimer);
259 pendingRequestTimer = startTimer(PendingRequestTimeout);
260}
261
262// Cancels a request for a block.
263void PeerWireClient::cancelRequest(int piece, int offset, int length)
264{
265 char message[] = {0, 0, 0, 1, 8};
266 toNetworkData(13, &message[0]);
267 write(message, sizeof(message));
268
269 char numbers[4 * 3];
270 toNetworkData(piece, &numbers[0]);
271 toNetworkData(offset, &numbers[4]);
272 toNetworkData(length, &numbers[8]);
273 write(numbers, sizeof(numbers));
274
275 incoming.removeAll(TorrentBlock(piece, offset, length));
276}
277
278// Sends a block to the peer.
279void PeerWireClient::sendBlock(int piece, int offset, const QByteArray &data)
280{
281 QByteArray block;
282
283 char message[] = {0, 0, 0, 1, 7};
284 toNetworkData(9 + data.size(), &message[0]);
285 block += QByteArray(message, sizeof(message));
286
287 char numbers[4 * 2];
288 toNetworkData(piece, &numbers[0]);
289 toNetworkData(offset, &numbers[4]);
290 block += QByteArray(numbers, sizeof(numbers));
291 block += data;
292
293 BlockInfo blockInfo;
294 blockInfo.pieceIndex = piece;
295 blockInfo.offset = offset;
296 blockInfo.length = data.size();
297 blockInfo.block = block;
298
299 pendingBlocks << blockInfo;
300 pendingBlockSizes += block.size();
301
302 if (pendingBlockSizes > 32 * 16384) {
303 chokePeer();
304 unchokePeer();
305 return;
306 }
307 emit readyToTransfer();
308}
309
310// Attempts to write 'bytes' bytes to the socket from the buffer.
311// This is used by RateController, which precisely controls how much
312// each client can write.
313qint64 PeerWireClient::writeToSocket(qint64 bytes)
314{
315 qint64 totalWritten = 0;
316 do {
317 if (outgoingBuffer.isEmpty() && !pendingBlocks.isEmpty()) {
318 BlockInfo block = pendingBlocks.takeFirst();
319 pendingBlockSizes -= block.length;
320 outgoingBuffer += block.block;
321 }
322 qint64 written = socket.write(outgoingBuffer.constData(),
323 qMin<qint64>(bytes - totalWritten, outgoingBuffer.size()));
324 if (written <= 0)
325 return totalWritten ? totalWritten : written;
326
327 totalWritten += written;
328 uploadSpeedData[0] += written;
329 outgoingBuffer.remove(0, written);
330 } while (totalWritten < bytes && (!outgoingBuffer.isEmpty() || !pendingBlocks.isEmpty()));
331
332 return totalWritten;
333}
334
335// Attempts to read at most 'bytes' bytes from the socket.
336qint64 PeerWireClient::readFromSocket(qint64 bytes)
337{
338 char buffer[1024];
339 qint64 totalRead = 0;
340 do {
341 qint64 bytesRead = socket.read(buffer, qMin<qint64>(sizeof(buffer), bytes - totalRead));
342 if (bytesRead <= 0)
343 break;
344 qint64 oldSize = incomingBuffer.size();
345 incomingBuffer.resize(oldSize + bytesRead);
346 memcpy(incomingBuffer.data() + oldSize, buffer, bytesRead);
347
348 totalRead += bytesRead;
349 } while (totalRead < bytes);
350
351 if (totalRead > 0) {
352 downloadSpeedData[0] += totalRead;
353 emit bytesReceived(totalRead);
354 processIncomingData();
355 }
356 return totalRead;
357}
358
359// Returns the average number of bytes per second this client is
360// downloading.
361qint64 PeerWireClient::downloadSpeed() const
362{
363 qint64 sum = 0;
364 for (unsigned int i = 0; i < sizeof(downloadSpeedData) / sizeof(qint64); ++i)
365 sum += downloadSpeedData[i];
366 return sum / (8 * 2);
367}
368
369// Returns the average number of bytes per second this client is
370// uploading.
371qint64 PeerWireClient::uploadSpeed() const
372{
373 qint64 sum = 0;
374 for (unsigned int i = 0; i < sizeof(uploadSpeedData) / sizeof(qint64); ++i)
375 sum += uploadSpeedData[i];
376 return sum / (8 * 2);
377}
378
379void PeerWireClient::setReadBufferSize(int size)
380{
381 socket.setReadBufferSize(size);
382}
383
384bool PeerWireClient::canTransferMore() const
385{
386 return bytesAvailable() > 0 || socket.bytesAvailable() > 0
387 || !outgoingBuffer.isEmpty() || !pendingBlocks.isEmpty();
388}
389
390void PeerWireClient::connectToHostImplementation(const QString &hostName,
391 quint16 port, OpenMode openMode)
392
393{
394 setOpenMode(openMode);
395 socket.connectToHost(hostName, port, openMode);
396}
397
398void PeerWireClient::diconnectFromHostImplementation()
399{
400 socket.disconnectFromHost();
401}
402
403void PeerWireClient::timerEvent(QTimerEvent *event)
404{
405 if (event->timerId() == transferSpeedTimer) {
406 // Rotate the upload / download records.
407 for (int i = 6; i >= 0; --i) {
408 uploadSpeedData[i + 1] = uploadSpeedData[i];
409 downloadSpeedData[i + 1] = downloadSpeedData[i];
410 }
411 uploadSpeedData[0] = 0;
412 downloadSpeedData[0] = 0;
413 } else if (event->timerId() == timeoutTimer) {
414 // Disconnect if we timed out; otherwise the timeout is
415 // restarted.
416 if (invalidateTimeout) {
417 invalidateTimeout = false;
418 } else {
419 abort();
420 emit infoHashReceived(QByteArray());
421 }
422 } else if (event->timerId() == pendingRequestTimer) {
423 abort();
424 } else if (event->timerId() == keepAliveTimer) {
425 sendKeepAlive();
426 }
427 QTcpSocket::timerEvent(event);
428}
429
430// Sends the handshake to the peer.
431void PeerWireClient::sendHandShake()
432{
433 sentHandShake = true;
434
435 // Restart the timeout
436 if (timeoutTimer)
437 killTimer(timeoutTimer);
438 timeoutTimer = startTimer(ClientTimeout);
439
440 // Write the 68 byte PeerWire handshake.
441 write(&ProtocolIdSize, 1);
442 write(ProtocolId, ProtocolIdSize);
443 write(QByteArray(8, '\0'));
444 write(infoHash);
445 write(peerIdString);
446}
447
448void PeerWireClient::processIncomingData()
449{
450 invalidateTimeout = true;
451 if (!receivedHandShake) {
452 // Check that we received enough data
453 if (bytesAvailable() < MinimalHeaderSize)
454 return;
455
456 // Sanity check the protocol ID
457 QByteArray id = read(ProtocolIdSize + 1);
458 if (id.at(0) != ProtocolIdSize || !id.mid(1).startsWith(ProtocolId)) {
459 abort();
460 return;
461 }
462
463 // Discard 8 reserved bytes, then read the info hash and peer ID
464 (void) read(8);
465
466 // Read infoHash
467 QByteArray peerInfoHash = read(20);
468 if (!infoHash.isEmpty() && peerInfoHash != infoHash) {
469 abort();
470 return;
471 }
472
473 emit infoHashReceived(peerInfoHash);
474 if (infoHash.isEmpty()) {
475 abort();
476 return;
477 }
478
479 // Send handshake
480 if (!sentHandShake)
481 sendHandShake();
482 receivedHandShake = true;
483 }
484
485 // Handle delayed peer id arrival
486 if (!gotPeerId) {
487 if (bytesAvailable() < 20)
488 return;
489 gotPeerId = true;
490 if (read(20) == peerIdString) {
491 // We connected to ourself
492 abort();
493 return;
494 }
495 }
496
497 // Initialize keep-alive timer
498 if (!keepAliveTimer)
499 keepAliveTimer = startTimer(KeepAliveInterval);
500
501 do {
502 // Find the packet length
503 if (nextPacketLength == -1) {
504 if (bytesAvailable() < 4)
505 return;
506
507 char tmp[4];
508 read(tmp, sizeof(tmp));
509 nextPacketLength = fromNetworkData(tmp);
510
511 if (nextPacketLength < 0 || nextPacketLength > 200000) {
512 // Prevent DoS
513 abort();
514 return;
515 }
516 }
517
518 // KeepAlive
519 if (nextPacketLength == 0) {
520 nextPacketLength = -1;
521 continue;
522 }
523
524 // Wait with parsing until the whole packet has been received
525 if (bytesAvailable() < nextPacketLength)
526 return;
527
528 // Read the packet
529 QByteArray packet = read(nextPacketLength);
530 if (packet.size() != nextPacketLength) {
531 abort();
532 return;
533 }
534
535 switch (packet.at(0)) {
536 case ChokePacket:
537 // We have been choked.
538 pwState |= ChokedByPeer;
539 incoming.clear();
540 if (pendingRequestTimer)
541 killTimer(pendingRequestTimer);
542 emit choked();
543 break;
544 case UnchokePacket:
545 // We have been unchoked.
546 pwState &= ~ChokedByPeer;
547 emit unchoked();
548 break;
549 case InterestedPacket:
550 // The peer is interested in downloading.
551 pwState |= PeerIsInterested;
552 emit interested();
553 break;
554 case NotInterestedPacket:
555 // The peer is not interested in downloading.
556 pwState &= ~PeerIsInterested;
557 emit notInterested();
558 break;
559 case HavePacket: {
560 // The peer has a new piece available.
561 quint32 index = fromNetworkData(&packet.data()[1]);
562 if (index < quint32(peerPieces.size())) {
563 // Only accept indexes within the valid range.
564 peerPieces.setBit(int(index));
565 }
566 emit piecesAvailable(availablePieces());
567 break;
568 }
569 case BitFieldPacket:
570 // The peer has the following pieces available.
571 for (int i = 1; i < packet.size(); ++i) {
572 for (int bit = 0; bit < 8; ++bit) {
573 if (packet.at(i) & (1 << (7 - bit))) {
574 int bitIndex = int(((i - 1) * 8) + bit);
575 if (bitIndex >= 0 && bitIndex < peerPieces.size()) {
576 // Occasionally, broken clients claim to have
577 // pieces whose index is outside the valid range.
578 // The most common mistake is the index == size
579 // case.
580 peerPieces.setBit(bitIndex);
581 }
582 }
583 }
584 }
585 emit piecesAvailable(availablePieces());
586 break;
587 case RequestPacket: {
588 // The peer requests a block.
589 quint32 index = fromNetworkData(&packet.data()[1]);
590 quint32 begin = fromNetworkData(&packet.data()[5]);
591 quint32 length = fromNetworkData(&packet.data()[9]);
592 emit blockRequested(int(index), int(begin), int(length));
593 break;
594 }
595 case PiecePacket: {
596 int index = int(fromNetworkData(&packet.data()[1]));
597 int begin = int(fromNetworkData(&packet.data()[5]));
598
599 incoming.removeAll(TorrentBlock(index, begin, packet.size() - 9));
600
601 // The peer sends a block.
602 emit blockReceived(index, begin, packet.mid(9));
603
604 // Kill the pending block timer.
605 if (pendingRequestTimer) {
606 killTimer(pendingRequestTimer);
607 pendingRequestTimer = 0;
608 }
609 break;
610 }
611 case CancelPacket: {
612 // The peer cancels a block request.
613 quint32 index = fromNetworkData(&packet.data()[1]);
614 quint32 begin = fromNetworkData(&packet.data()[5]);
615 quint32 length = fromNetworkData(&packet.data()[9]);
616 for (int i = 0; i < pendingBlocks.size(); ++i) {
617 const BlockInfo &blockInfo = pendingBlocks.at(i);
618 if (blockInfo.pieceIndex == int(index)
619 && blockInfo.offset == int(begin)
620 && blockInfo.length == int(length)) {
621 pendingBlocks.removeAt(i);
622 break;
623 }
624 }
625 break;
626 }
627 default:
628 // Unsupported packet type; just ignore it.
629 break;
630 }
631 nextPacketLength = -1;
632 } while (bytesAvailable() > 0);
633}
634
635void PeerWireClient::socketStateChanged(QAbstractSocket::SocketState state)
636{
637 setLocalAddress(socket.localAddress());
638 setLocalPort(socket.localPort());
639 setPeerName(socket.peerName());
640 setPeerAddress(socket.peerAddress());
641 setPeerPort(socket.peerPort());
642 setSocketState(state);
643}
644
645qint64 PeerWireClient::readData(char *data, qint64 size)
646{
647 int n = qMin<int>(size, incomingBuffer.size());
648 memcpy(data, incomingBuffer.constData(), n);
649 incomingBuffer.remove(0, n);
650 return n;
651}
652
653qint64 PeerWireClient::readLineData(char *data, qint64 maxlen)
654{
655 return QIODevice::readLineData(data, maxlen);
656}
657
658qint64 PeerWireClient::writeData(const char *data, qint64 size)
659{
660 int oldSize = outgoingBuffer.size();
661 outgoingBuffer.resize(oldSize + size);
662 memcpy(outgoingBuffer.data() + oldSize, data, size);
663 emit readyToTransfer();
664 return size;
665}
Note: See TracBrowser for help on using the repository browser.