Browsed by
Tag: connection

Connection part three – Receiving messages

Connection part three – Receiving messages

In the previous posts, all that we’ve done was to construct and send messages to another node on the network. In this post, we’ll see what happens to incoming messages.

First stop – The ReceiverManager:

class ReceiverManager(Thread):
    def __init__(self, sock):
        Thread.__init__(self)
        self.sendingQueue = Utils.globals.sendingQueue
        self.sock = sock
        self.ping = ""

        self.outfile = open("data_received_from_node.txt", 'w')

    def run(self):
        while True:
            try:

                # get only the header's message
                header = self.sock.recv(24)

                if len(header) <= 0:
                    raise Exception("Node disconnected (received 0bit length message)")

                headerStream = BytesIO(header)
                parsedHeader = HeaderParser(headerStream)

                # get the payload
                payload = self.recvall(parsedHeader.payload_size)
                payloadStream = BytesIO(payload)

                self.manager(parsedHeader, payloadStream)

            except Exception as e:
                print(e)
                break

        print("Exit receiver Thread")

The receivermanager always runs in the background, checking our Thread for any incoming packets. Once it receives a packet, it will immediately cut its first 24 bytes.

header = self.sock.recv(24)

The first 24 bytes are the header. If you remember from this post, every Bitcoin message will starts with header, and the header is always exactly 24 bytes long.

 

The first 24 bytes are the header. The rest is the payload
The first 24 bytes are the header. The rest is the payload.

This header is now parsed as a string of bytes and passed to the HeaderParser class in Bitpy/Network/HeaderParser.py

headerStream = BytesIO(header)
parsedHeader = HeaderParser(headerStream)

 

Second stop – The HeaderParser class:

The HeaderParser class takes the first 24 bytes as a long string of bytes, and then it reads them in the same order that we’ve seen before.

Size (Bytes) Name Data type Description
4 Start string char[4]  The network identifier
12 Command name char[12]  The name of the command.
4 Payload size uint32 Len(payload)
4 Checksum char[4]  SHA256(SHA256(payload))[:4]

First 4 bytes for the Start string (or Magic number), another 12 bytes for Command name, the next 4 bytes are the Payload size and the last 4 bytes are the checksum.

4 bytes for starting string. 12 for command name. 4 for payload size and 4 for checksum
4 bytes for starting string. 12 for command name. 4 for payload size and 4 for checksum
class HeaderParser:
    def __init__(self, header):  # Packets is a stream

        self.magic = read_hexa(header.read(4))
        self.command = header.read(12)
        self.payload_size = read_uint32(header.read(4))
        self.checksum = read_hexa(header.read(4))

        self.header_size = 4 + 12 + 4 + 4

    def to_string(self):
        display = "\n-------------HEADER-------------"
        display += "\nMagic:\t %s" % self.magic
        display += "\nCommand name	:\t %s" % self.command
        display += "\nPayload size	:\t %s" % self.payload_size
        display += "\nChecksum	:\t\t %s" % self.checksum
        display += "\nheader Size:\t\t %s" % self.header_size
        display += "\n"
        return display

We’ve also defined the to_string function which basically makes it easier to print a human readable version of the message header.

You might’ve noticed that currently our code just accept the checksum field from the received message without checking it. This is of course a security flaw in our code. The checksum filed is there to help us verify the authenticity of the message. That is one of the ways we can make sure that no one tempered or changed the message on its way from the sender node to our node. But for the time being we’ll assume that the message is indeed authentic and we’ll accept the checksum as is.

 

Third stop – Back to the ReceiverManager:

Now that we have our header, it’s time to get the payload. The size of the payload was defined in the header of the message. We need to cut that amount of bytes from our incoming packets, just as we cut the first 24 bytes of the header. There’s however one extra step in our code. Instead of using the built in sock.recv function (as we did for the header) we’ve decided to implement our own recevall function. The rational was that since we have no way to predetermine the size of the payload, and since the built in sock.recv can’t handle large packets of unknown size, it would be wiser to break the payload into smaller parts and append them together. This has nothing to do with the Bitcoin protocol, it’s only our way to make sure that the code will properly handle large messages.

def recvall(self, length):
    parts = []

    while length > 0:
        part = self.sock.recv(length)
        if not part:
            raise EOFError('socket closed with %d bytes left in this part'.format(length))

        length -= len(part)
        parts.append(part)

    return b''.join(parts)

So now, after we’ve cut the required amount of bytes that represents the payload of our message, and we have both our header (which was already parsed) and our payload (yet to be parsed), we’ll pass them both to the receivermanager manager function.

 

Forth stop – Manager:

    
def manager(self, parsedHeader, payloadStream):

    command = parsedHeader.command.decode("utf-8")
    message = {"timestamp": time.time(), "command": command, "header": parsedHeader.to_string(), "payload": ""}


    if command.startswith('ping'):
        ping = Ping.DecodePing(payloadStream)

        pong = Pong.EncodePong(ping.nonce)
        packet = PacketCreator(pong)
        self.sendingQueue.put(packet.forge_packet())

        message["payload"] = str(ping.nonce)
        self.display(message)

    elif command.startswith('inv'):
        inv = Inv.DecodeInv(payloadStream)
        message["payload"] = inv.get_decoded_info()
        self.display(message)

    elif command.startswith('addr'):
        addr = Addr.DecodeAddr(payloadStream)
        message["payload"] = addr.get_decoded_info()
        self.display(message)

    elif command.startswith('pong'):
        pong = Pong.DecodePong(payloadStream)
        message["payload"] = pong.get_decoded_info()
        self.display(message)

    elif command.startswith('version'):
        version = Version.DecodeVersion(payloadStream)
        message["payload"] = version.get_decoded_info()
        self.display(message)

The manager function does a very simple thing. It checks the command of the message (the command is part of the header) and then it sends the message payload to be parsed by the corresponding functions. For example. If the manager sees that the command is «pong», it will use the decodepong method in Bitpay/Packets/control_messages/pong.py to extract the desire fields out of it. (You can read more about «pong», «ping» and «verack» messages in this post.).

 

Divergence

We have our pared message, both its header and payload. And now we need to decide what to do with them. For some messages this might be the end of the line. There’s nothing more we can do with them. Some might require us to act. «ping» message should be answered by a «pong» message, transactions should be checked and relayed (We’ll talk about transactions in later posts), «version» messages should be acknowledged by sending back a «verack» message.

A major part of learning the Bitcoin protocol is learning how each and every message should be dealt with. Which fields of information it contains and what is the meaning of this information. We’ve already talked about some of the messages in previous posts  (see here for «ping», «pong» and «verack» messages, and here for «version» message.) and as our project will have more features implemented, so we’ll discuss other type of messages and how to deal with them.

Messages part two – Payloads and version message

Messages part two – Payloads and version message

In the previous posts we’ve talked a little bit about messages. We know that a message is nothing more than a string of bytes, it has an header and a body (payload), and it must maintain its predefined format. We’ve seen the format of the header, but every message body (payload) will contain different information, according to the message type. We’ll start by constructing the “version” message.

The version message is used when trying to establish a connection with the remote node. Alice will send the version message to Bob, and only after Bob have approved this version message, and replay with his own version message, only then the connection between the two nodes can be established. No other message will be accepted before both nodes have exchanged this version message. So it’s no surprise that we choose to construct this message first, since this is the first message that we’ll send, and the first one we’ll receive.

 

The fields that are required in our version message (From the developer reference):

 

Size (Bytes) Name Data type Description
4 version int32 What is the latest version of the protocol that the transmitting node (our node) understands. In this example this number is 70012
8 services uint64
Not full node 0x00
Full node 0x01
8 timestamp int64 Current timestamp
8 addr_recv services uint64 What type of services OUR receiving node can support?
16 addr_recv IP address char The IP address of OUR receiving node
2 addr_recv port uint16 The port of OUR receiving node
8 addr_trans services uint64 What type of services OUR transmitting node can support?
16 addr_trans IP address char The IP address of OUR transmitting node
2 addr_trans port uint16 The port of OUR transmitting node
8 nonce uint64 A random number that helps the receiving node to detect and index our connection
Varies user_agent bytes CompactSize This field varies in size, but it tells the other node what should be the size of the next field
Varies user_agent string This field is used to display the name of our node, like licence plates. We can call our node whatever we want, “core”, “classic”, “my_cool_bitcoin_thingy”.
4 start_height int32 The highest block that the transmitting node knows of.
1 relay bool
True The transmitting node can relay messages to the rest of the network
False The transmitting node can’t relay messages to the rest of the network
  • Pay attention that in the “version” message, when asking for both the receiving and the transmitting  services, IP address and ports, we’re asked about our own machine, our own node. In our case both incoming and outgoing messages will be dealt in a similar manner, but some implementations might include more advanced routing.

Code implementation

Alexis and I decided that every message will have it’s own file in which the payload of the message will be both created (For messages that our node will send) and parsed (For incoming messages).

import random
import time
from io import BytesIO
from Utils.config import version_number, latest_known_block
from Utils.dataTypes import *


class EncodeVersion:
    def __init__(self):
        self.command_name = "version"

        self.version = to_int32(version_number)
        self.services = to_uint64(0)
        self.timestamp = to_int64(time.time())

        self.addr_recv_services = to_uint64(0)
        self.addr_recv_ip = to_big_endian_16char("127.0.0.1")
        self.addr_recv_port = to_big_endian_uint16(8333)

        self.addr_trans_services = to_uint64(0)
        self.addr_trans_ip = to_big_endian_16char("127.0.0.1")
        self.addr_trans_port = to_big_endian_uint16(8333)

        self.nonce = to_uint64(random.getrandbits(64))
        self.user_agent_bytes = to_uchar(0)
        self.starting_height = to_int32(latest_known_block)
        self.relay = to_bool(False)

    def forge(self):
        return self.version + self.services + self.timestamp + \
               self.addr_recv_services + self.addr_recv_ip + self.addr_recv_port + \
               self.addr_trans_services + self.addr_trans_ip + self.addr_trans_port + \
               self.nonce + self.user_agent_bytes + self.starting_height + \
               self.relay


class DecodedVersion:
    def __init__(self, payload):
        self.version = read_int32(payload.read(4))
        self.services = read_uint64(payload.read(8))
        self.timestamp = read_int64(payload.read(8))

        self.addr_recv_services = read_uint64(payload.read(8))
        self.addr_recv_ip = parse_ip(payload.read(16))
        self.addr_recv_port = read_big_endian_uint16(payload.read(2))

        self.addr_trans_services = read_uint64(payload.read(8))
        self.addr_trans_ip = parse_ip(payload.read(16))
        self.addr_trans_port = read_big_endian_uint16(payload.read(2))

        self.nonce = read_uint64(payload.read(8))

        self.user_agent_bytes = read_compactSize_uint(BytesIO(payload.read(1)))
        self.user_agent = read_char(payload.read(self.user_agent_bytes), self.user_agent_bytes)

        self.starting_height = read_int32(payload.read(4))
        self.relay = read_bool(payload.read(1))

    def get_decoded_info(self):
        display = "\n-----Version-----"
        display += "\nversion                :\t\t %s" % self.version
        display += "\nservices  	         :\t\t %s" % self.services
        display += "\ntimestamp              :\t\t %s" % self.timestamp

        display += "\naddr_recv_services	 :\t\t %s" % self.addr_recv_services
        display += "\naddr_recv_ip           :\t\t %s" % self.addr_recv_ip
        display += "\naddr_recv_port         :\t\t %s" % self.addr_recv_port

        display += "\naddr_trans_services  	:\t\t %s" % self.addr_trans_services
        display += "\naddr_trans_ip         :\t\t %s" % self.addr_trans_ip
        display += "\naddr_trans_port	    :\t\t %s" % self.addr_trans_port

        display += "\nnonce                 :\t\t %s" % self.nonce

        display += "\nuser_agent_bytes  	:\t\t %s" % self.user_agent_bytes
        display += "\nuser_agent            :\t\t %s" % self.user_agent
        display += "\nstarting_height	    :\t\t %s" % self.starting_height
        display += "\nrelay	                :\t\t %s" % self.relay

        return display

Because this code is used both for incoming and outgoing messages, it has both the class EncodeVersion, which is used to build the payload of the version message, and the class DecodedVersion which is used to parse the payload of any incoming version message.

The function forge will just append and return all the fields in the right order – this is the finale payload.

 

EncodeVersion

Because we haven’t established connection yet, we first need to create the payload of our version message using the EncodeVersion class. The class won’t take any argument (except for self) and will just assign every field with the right value and the right data type.

The variables version_number and last_known_block are imported from Bitpy/Utils/config.py and are set to:

version_number = 70012
latest_known_block = 416419  # june 2016

Our node is not a full node so the services will be set to 0x00. For that reason we’ll also set our relay field to be False.

In our own node, both incoming and outgoing messages will be dealt by the same machine so both receiving and transmitting machines are the same:

self.addr_recv_services = to_uint64(0)
self.addr_recv_ip = to_big_endian_16char("127.0.0.1")
self.addr_recv_port = to_big_endian_uint16(8333)

self.addr_trans_services = to_uint64(0)
self.addr_trans_ip = to_big_endian_16char("127.0.0.1")
self.addr_trans_port = to_big_endian_uint16(8333)

 

We’re using the function random.getrandbits(64) in order to populate or nonce field with 8 bytes long random number.

We’re also not adding any vanity name to our node at the time so we’re setting the user_agent_bytes to be 0. That means that there’s no user_agent_bytes field.

 

 

DecodedVersion

This class is quite straightforward, it receives the payload of the incoming message from Bitpy/Manager/ReceiverManager.py.

It uses the builtin function read  and our data types functions to assign each field with the proper value, for example the first 4 bytes are the version number in uint32 format, the next 8 bytes are the services field in uint64 format and so on.

The only thing that is really unique is the user_agent field:

self.user_agent_bytes = read_compactSize_uint(BytesIO(payload.read(1)))
self.user_agent = read_char(payload.read(self.user_agent_bytes), self.user_agent_bytes)

This is the first example of the varying CompactSize data type in use. The field user_agent_bytes doesn’t have a fixed size. The Bitcoin protocol defines the variable data type CompactSize to deal with such fields (you can read more about this data type in the data types section). We’re using the function BytesIOin in order to send this argument as a string of bytes to theread_CompactSize_unit function and receives back the Uint that matches the size of the next field, the, the user_agent field. Then we’re using the data type function read_char which requires two arguments. The first is the string of bytes itself (payload.read(self.user_agent_bytes)) and the second is the size of the total string (self.user_agent_bytes).

Once we’ve finished parsing out version message we can use the get_decode_info function in order to display the information about the remote node (currently, we aren’t doing anything with this information except to dump it as a text file).

—–Version—–
version : 70012
services : 5
timestamp : 1467293151
addr_recv_services : 1
addr_recv_ip : ��^�V�
addr_recv_port : 30373
addr_trans_services : 5
addr_trans_ip : ��
addr_trans_port : 8333
nonce : 1755461931592560680
user_agent_bytes : 16
user_agent : /Classic:0.12.0/
starting_height : 418653
relay : True

 

Edit (4-Jul-2016): Python 2.5 to 3.5 migration

Please read the general notes about the transition from Python 2.5 to 3.5 over here. And the complete github change log for the migration over here.

The code for the <Version> message remind fairly untouched, only few adjustments were required:
class EncodeVersion:
///
self.timestamp = to_int64(int(time.time()))
///
self.addr_recv_ip = to_big_endian_16char(b“127.0.0.1”)
///
self.addr_trans_ip = to_big_endian_16char(b“127.0.0.1”)
///
class DecodedVersion:
///
self.user_agent = read_chars(payload.read(self.user_agent_bytes), self.user_agent_bytes)
///