A Simplistic TCP Finite State Machine (FSM) in Python


The problem

Automatons, or Finite State Machines (FSM), are extraordinarily helpful to programmers in the case of software program design. You’ll be given a simplistic model of an FSM to code for a fundamental TCP session.

The end result of this train can be to return the proper state of the TCP FSM based mostly on the array of occasions given.

The enter array of occasions will include a number of of the next strings:

APP_PASSIVE_OPEN, APP_ACTIVE_OPEN, APP_SEND, APP_CLOSE, APP_TIMEOUT, RCV_SYN, RCV_ACK, RCV_SYN_ACK, RCV_FIN, RCV_FIN_ACK

The states are as follows and must be returned in all capital letters as proven:

CLOSED, LISTEN, SYN_SENT, SYN_RCVD, ESTABLISHED, CLOSE_WAIT, LAST_ACK, FIN_WAIT_1, FIN_WAIT_2, CLOSING, TIME_WAIT

The enter can be an array of occasions. Your job is to traverse the FSM as decided by the occasions, and return the right state as a string, all caps, as proven above.

If an occasion just isn’t relevant to the present state, your code will return "ERROR".

Motion of every occasion upon every state:

(the format is INITIAL_STATE: EVENT -> NEW_STATE)

CLOSED: APP_PASSIVE_OPEN -> LISTEN
CLOSED: APP_ACTIVE_OPEN  -> SYN_SENT
LISTEN: RCV_SYN          -> SYN_RCVD
LISTEN: APP_SEND         -> SYN_SENT
LISTEN: APP_CLOSE        -> CLOSED
SYN_RCVD: APP_CLOSE      -> FIN_WAIT_1
SYN_RCVD: RCV_ACK        -> ESTABLISHED
SYN_SENT: RCV_SYN        -> SYN_RCVD
SYN_SENT: RCV_SYN_ACK    -> ESTABLISHED
SYN_SENT: APP_CLOSE      -> CLOSED
ESTABLISHED: APP_CLOSE   -> FIN_WAIT_1
ESTABLISHED: RCV_FIN     -> CLOSE_WAIT
FIN_WAIT_1: RCV_FIN      -> CLOSING
FIN_WAIT_1: RCV_FIN_ACK  -> TIME_WAIT
FIN_WAIT_1: RCV_ACK      -> FIN_WAIT_2
CLOSING: RCV_ACK         -> TIME_WAIT
FIN_WAIT_2: RCV_FIN      -> TIME_WAIT
TIME_WAIT: APP_TIMEOUT   -> CLOSED
CLOSE_WAIT: APP_CLOSE    -> LAST_ACK
LAST_ACK: RCV_ACK        -> CLOSED

Examples

["APP_PASSIVE_OPEN", "APP_SEND", "RCV_SYN_ACK"] =>  "ESTABLISHED"

["APP_ACTIVE_OPEN"] =>  "SYN_SENT"

["APP_ACTIVE_OPEN", "RCV_SYN_ACK", "APP_CLOSE", "RCV_FIN_ACK", "RCV_ACK"] =>  "ERROR"

See wikipedia web page Transmission Management Protocol for additional particulars.

The answer in Python code

Possibility 1:

STATE_TO_COMMANDS = {
  'CLOSED': {
    'APP_PASSIVE_OPEN': 'LISTEN',
    'APP_ACTIVE_OPEN': 'SYN_SENT'
  },
  'LISTEN': {
    'RCV_SYN': 'SYN_RCVD',
    'APP_SEND': 'SYN_SENT',
    'APP_CLOSE': 'CLOSED'
  },
  'SYN_RCVD': {
    'APP_CLOSE': 'FIN_WAIT_1',
    'RCV_ACK': 'ESTABLISHED'
  },
  'SYN_SENT': {
    'RCV_SYN': 'SYN_RCVD',
    'RCV_SYN_ACK': 'ESTABLISHED',
    'APP_CLOSE': 'CLOSED'
  },
  'ESTABLISHED': {
    'APP_CLOSE': 'FIN_WAIT_1',
    'RCV_FIN': 'CLOSE_WAIT'
  },
  'FIN_WAIT_1': {
    'RCV_FIN': 'CLOSING',
    'RCV_FIN_ACK': 'TIME_WAIT',
    'RCV_ACK': 'FIN_WAIT_2'
  },
  'CLOSING': {
    'RCV_ACK': 'TIME_WAIT'
  },
  'FIN_WAIT_2': {
    'RCV_FIN': 'TIME_WAIT'
  },
  'TIME_WAIT': {
    'APP_TIMEOUT': 'CLOSED'
  },
  'CLOSE_WAIT': {
    'APP_CLOSE': 'LAST_ACK'
  },
  'LAST_ACK': {
    'RCV_ACK': 'CLOSED'
  }
}


def traverse_TCP_states(occasions):
  state = "CLOSED"  # preliminary state, all the time
  for occasion in occasions:
    if occasion not in STATE_TO_COMMANDS[state]:
      return 'ERROR'
    state = STATE_TO_COMMANDS[state][event]
  return state

Possibility 2:

STATES = {"CLOSED":     {"APP_PASSIVE_OPEN": "LISTEN", "APP_ACTIVE_OPEN": "SYN_SENT"},
          "LISTEN":     {"RCV_SYN": "SYN_RCVD", "APP_SEND": "SYN_SENT", "APP_CLOSE": "CLOSED"},
          "SYN_RCVD":   {"APP_CLOSE": "FIN_WAIT_1", "RCV_ACK": "ESTABLISHED"},
          "SYN_SENT":   {"RCV_SYN": "SYN_RCVD", "RCV_SYN_ACK": "ESTABLISHED", "APP_CLOSE": "CLOSED"},
          "ESTABLISHED":{"APP_CLOSE": "FIN_WAIT_1", "RCV_FIN": "CLOSE_WAIT"},
          "FIN_WAIT_1": {"RCV_FIN": "CLOSING", "RCV_FIN_ACK": "TIME_WAIT", "RCV_ACK": "FIN_WAIT_2"},
          "CLOSING":    {"RCV_ACK": "TIME_WAIT"},
          "FIN_WAIT_2": {"RCV_FIN": "TIME_WAIT"},
          "TIME_WAIT":  {"APP_TIMEOUT": "CLOSED"},
          "CLOSE_WAIT": {"APP_CLOSE": "LAST_ACK"},
          "LAST_ACK":   {"RCV_ACK": "CLOSED"},
         }


def traverse_TCP_states(occasions):
    state = "CLOSED"
    strive:
        for e in occasions:
            state = STATES[state][e]
        return state
    besides KeyError:
        return "ERROR"

Possibility 3:

def traverse_TCP_states(occasions, state='CLOSED'):

    for occasion in occasions:
        state = {('CLOSED', 'APP_PASSIVE_OPEN'):'LISTEN',
                 ('CLOSED', 'APP_ACTIVE_OPEN'): 'SYN_SENT',
                 ('LISTEN', 'RCV_SYN'):'SYN_RCVD',
                 ('LISTEN', 'APP_SEND'):'SYN_SENT',
                 ('LISTEN', 'APP_CLOSE'):'CLOSED', 
                 ('SYN_RCVD', 'APP_CLOSE'):'FIN_WAIT_1',
                 ('SYN_RCVD', 'RCV_ACK'):'ESTABLISHED',
                 ('SYN_SENT', 'RCV_SYN'):'SYN_RCVD',
                 ('SYN_SENT', 'RCV_SYN_ACK'):'ESTABLISHED',
                 ('SYN_SENT', 'APP_CLOSE'):'CLOSED',
                 ('ESTABLISHED', 'APP_CLOSE'):'FIN_WAIT_1',
                 ('ESTABLISHED', 'RCV_FIN'):'CLOSE_WAIT',
                 ('FIN_WAIT_1', 'RCV_FIN'):'CLOSING',
                 ('FIN_WAIT_1', 'RCV_FIN_ACK'):'TIME_WAIT',
                 ('FIN_WAIT_1', 'RCV_ACK'):'FIN_WAIT_2',
                 ('CLOSING', 'RCV_ACK'):'TIME_WAIT',
                 ('FIN_WAIT_2', 'RCV_FIN'):'TIME_WAIT',
                 ('TIME_WAIT', 'APP_TIMEOUT'):'CLOSED',
                 ('CLOSE_WAIT', 'APP_CLOSE'):'LAST_ACK',
                 ('LAST_ACK', 'RCV_ACK'):'CLOSED'}.get((state, occasion), 'ERROR')
    return state

Check instances to validate our answer

take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","RCV_FIN"]), "CLOSE_WAIT")
take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN",  "RCV_SYN","RCV_ACK"]), "ESTABLISHED")    
take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","RCV_FIN","APP_CLOSE"]), "LAST_ACK")
take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN"]), "SYN_SENT")
take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE","APP_SEND"]), "ERROR")

Further take a look at instances

@take a look at.describe("Fastened assessments")
def fixed_tests():
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN",  "RCV_SYN","RCV_ACK",   "APP_CLOSE"]),"FIN_WAIT_1")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN",  "RCV_SYN","RCV_ACK"]), "ESTABLISHED")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN",  "RCV_SYN"]), "SYN_RCVD")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN"]), "LISTEN")
    take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","APP_CLOSE"]), "CLOSED")
    take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN","APP_CLOSE","RCV_FIN","RCV_ACK"]), "TIME_WAIT")
    take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN","APP_CLOSE","RCV_FIN","RCV_ACK","APP_TIMEOUT"]), "CLOSED")
    take a look at.assert_equals(traverse_TCP_states(["RCV_SYN","RCV_ACK","APP_CLOSE"]),"ERROR")
    take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN","APP_CLOSE","RCV_ACK"]), "FIN_WAIT_2")
    take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","RCV_FIN"]), "CLOSE_WAIT")
    take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","RCV_FIN","APP_CLOSE"]), "LAST_ACK")
    take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN"]), "SYN_SENT")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","APP_CLOSE"]), "CLOSED")
    take a look at.assert_equals(traverse_TCP_states(["APP_ACTIVE_OPEN","RCV_SYN_ACK","APP_CLOSE"]), "FIN_WAIT_1")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_PASSIVE_OPEN"]), "ERROR")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE","RCV_FIN_ACK","APP_TIMEOUT","APP_ACTIVE_OPEN","RCV_SYN","APP_CLOSE","RCV_FIN","RCV_ACK"]), "TIME_WAIT")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE","RCV_SYN"]), "ERROR")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","APP_CLOSE","RCV_SYN"]), "ERROR")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE"]), "FIN_WAIT_1")
    take a look at.assert_equals(traverse_TCP_states(["APP_PASSIVE_OPEN","RCV_SYN","RCV_ACK","APP_CLOSE","RCV_FIN"]), "CLOSING")


@take a look at.describe("Random assessments")
def random_tests():

    from random import randint as rand, alternative

    STATES = {"CLOSED":     {"APP_PASSIVE_OPEN": "LISTEN", "APP_ACTIVE_OPEN": "SYN_SENT"},
              "LISTEN":     {"RCV_SYN": "SYN_RCVD", "APP_SEND": "SYN_SENT", "APP_CLOSE": "CLOSED"},
              "SYN_RCVD":   {"APP_CLOSE": "FIN_WAIT_1", "RCV_ACK": "ESTABLISHED"},
              "SYN_SENT":   {"RCV_SYN": "SYN_RCVD", "RCV_SYN_ACK": "ESTABLISHED", "APP_CLOSE": "CLOSED"},
              "ESTABLISHED":{"APP_CLOSE": "FIN_WAIT_1", "RCV_FIN": "CLOSE_WAIT"},
              "FIN_WAIT_1": {"RCV_FIN": "CLOSING", "RCV_FIN_ACK": "TIME_WAIT", "RCV_ACK": "FIN_WAIT_2"},
              "CLOSING":    {"RCV_ACK": "TIME_WAIT"},
              "FIN_WAIT_2": {"RCV_FIN": "TIME_WAIT"},
              "TIME_WAIT":  {"APP_TIMEOUT": "CLOSED"},
              "CLOSE_WAIT": {"APP_CLOSE": "LAST_ACK"},
              "LAST_ACK":   {"RCV_ACK": "CLOSED"},
             }

    START, ERROR = 'CLOSED', 'ERROR'
    ALL_CMDS = "APP_PASSIVE_OPEN, APP_ACTIVE_OPEN, APP_SEND, APP_CLOSE, APP_TIMEOUT, RCV_SYN, RCV_ACK, RCV_SYN_ACK, RCV_FIN, RCV_FIN_ACK".cut up(", ")

    def doTest(nTest, cmdMin, cmdMax, endProba):
        for _ in vary(nTest):
            state, cmds = START, []

            for _ in vary(rand(cmdMin,cmdMax)):
                endIt = rand(0,100) > endProba
                final  = alternative(ALL_CMDS if endIt else checklist(STATES[state].keys()) )
                state = STATES[state].get(final, ERROR)
                cmds.append(final)
                if endIt: break

            take a look at.assert_equals(traverse_TCP_states(cmds), state)

    doTest(100,2,4,80)
    doTest(100,10,50,98)

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles