Monday, December 17, 2012

OpenvSwitch 代码分析(四)

本小节主要分析ovsd和datapath之间通过netlink进行通信的机制。

datapath运行在内核态,ovsd运行在用户态,两者通过netlink通信。

datapath使用generic netlink

dp_init()函数(datapath.c)中,调用dp_register_genl()完成对四种类型的family以及相应操作的注册,包括datapathvportflowpacket。前三种family,都对应四种操作都包括NEWDELGETSET,而packet的操作仅为EXECUTE
这些family和操作的定义均在datapath.c中。
flow family为例。代码为
static const struct nla_policy flow_policy[OVS_FLOW_ATTR_MAX + 1] = {
   [OVS_FLOW_ATTR_KEY] = { .type = NLA_NESTED },
   [OVS_FLOW_ATTR_ACTIONS] = { .type = NLA_NESTED },
   [OVS_FLOW_ATTR_CLEAR] = { .type = NLA_FLAG },
};

static struct genl_family dp_flow_genl_family = {
   .id = GENL_ID_GENERATE,
   .hdrsize = sizeof(struct ovs_header),
   .name = OVS_FLOW_FAMILY,
   .version = OVS_FLOW_VERSION,
   .maxattr = OVS_FLOW_ATTR_MAX,
    SET_NETNSOK
};
而绑定的ops的定义为
static struct genl_ops dp_flow_genl_ops[] = {
   { .cmd = OVS_FLOW_CMD_NEW,
     .flags = GENL_ADMIN_PERM, /* Requires CAP_NET_ADMIN privilege. */
     .policy = flow_policy,
     .doit = ovs_flow_cmd_new_or_set
   },
   { .cmd = OVS_FLOW_CMD_DEL,
     .flags = GENL_ADMIN_PERM, /* Requires CAP_NET_ADMIN privilege. */
     .policy = flow_policy,
     .doit = ovs_flow_cmd_del
   },
   { .cmd = OVS_FLOW_CMD_GET,
     .flags = 0,                               /* OK for unprivileged users. */
     .policy = flow_policy,
     .doit = ovs_flow_cmd_get,
     .dumpit = ovs_flow_cmd_dump
   },
   { .cmd = OVS_FLOW_CMD_SET,
     .flags = GENL_ADMIN_PERM, /* Requires CAP_NET_ADMIN privilege. */
     .policy = flow_policy,
     .doit = ovs_flow_cmd_new_or_set,
   },
};
可见,dp定义的nlmsg类型除了genl头和nl头之外,还有自定义的ovs_header

ovsd使用netlink

ovsd对于netlink的实现,主要在lib/netlink-socket.c文件中。而对这些netlink操作的调用,主要在lib/dpif-linux.c文件(以dpif_linux_class为例)中对于各个行为的处理,各种可能的消息类型在datapath模块中事先进行了内核注册。
datapath中对netlink family类型进行了注册,ovsd在使用这些netlink family之前需要获取它们的信息,这一过程主要在lib/dpif-linux.c文件(以dpif_linux_class为例),dpif_linux_init()函数。代码为
static int
dpif_linux_init(void)
{
    static int error = -1;

    if (error < 0) {
        unsigned int ovs_vport_mcgroup;

        error = nl_lookup_genl_family(OVS_DATAPATH_FAMILY,
                                      &ovs_datapath_family);
        if (error) {
            VLOG_ERR("Generic Netlink family '%s' does not exist. "
                     "The Open vSwitch kernel module is probably not loaded.",
                     OVS_DATAPATH_FAMILY);
        }
        if (!error) {
            error = nl_lookup_genl_family(OVS_VPORT_FAMILY, &ovs_vport_family);
        }
        if (!error) {
            error = nl_lookup_genl_family(OVS_FLOW_FAMILY, &ovs_flow_family);
        }
        if (!error) {
            error = nl_lookup_genl_family(OVS_PACKET_FAMILY,
                                          &ovs_packet_family);
        }
        if (!error) {
            error = nl_sock_create(NETLINK_GENERIC, &genl_sock);
        }
        if (!error) {
            error = nl_lookup_genl_mcgroup(OVS_VPORT_FAMILY, OVS_VPORT_MCGROUP,
                                           &ovs_vport_mcgroup,
                                           OVS_VPORT_MCGROUP_FALLBACK_ID);
        }
        if (!error) {
            static struct dpif_linux_vport vport;
            nln = nln_create(NETLINK_GENERIC, ovs_vport_mcgroup,
                             dpif_linux_nln_parse, &vport);
        }
    }

    return error;
}
完成这些查找后,ovsd即可利用dpif中的api,通过发出这些netlink消息给datapath实现对datapath的操作。
相关的中间层API定义主要在dpif_class(位于lib/dpif-provider.h)的抽象类型中。
各个抽象API具体的实现,仍然分为dpif_linux_classlib/dpif-linux.c)和dpif_netdev_classlib/dpif-netdev.c)两个具体类型。这些api作为中间层,对上层则被lib/dpif.c文件中的高级api所封装使用。
这里以dpif_flow_put()lib/dpif.c)为例分析netlink消息的构建和发出过程。该函数试图对绑定的datapath中的flow进行设置。其主要调用了函数dpif_flow_put__(),而dpif_flow_put__()的代码为
static int
dpif_flow_put__(struct dpif *dpif, const struct dpif_flow_put *put)
{
    int error;

    COVERAGE_INC(dpif_flow_put);
    assert(!(put->flags & ~(DPIF_FP_CREATE | DPIF_FP_MODIFY
                            | DPIF_FP_ZERO_STATS)));

    error = dpif->dpif_class->flow_put(dpif, put);
    if (error && put->stats) {
        memset(put->stats, 0, sizeof *put->stats);
    }
    log_flow_put_message(dpif, put, error);
    return error;
}
可见,其调用了具体的dpif_class中的抽象接口,以dpif_linux_class为例,该接口实际为dpif_linux_flow_put(),其代码如下
static int
dpif_linux_flow_put(struct dpif *dpif_, const struct dpif_flow_put *put)
{
    struct dpif_linux_flow request, reply;
    struct ofpbuf *buf;
    int error;

    dpif_linux_init_flow_put(dpif_, put, &request);
    error = dpif_linux_flow_transact(&request,
                                     put->stats ? &reply : NULL,
                                     put->stats ? &buf : NULL);
    if (!error && put->stats) {
        dpif_linux_flow_get_stats(&reply, put->stats);
        ofpbuf_delete(buf);
    }
    return error;
}
从代码中,可见主要执行两个过程,利用dpif_linux_init_flow_put()进行初始化,和利用dpif_linux_flow_transact()发送nlmsg
dpif_linux_init_flow_put()利用put构建了request消息,代码为
static void
dpif_linux_init_flow_put(struct dpif *dpif_, const struct dpif_flow_put *put,
                         struct dpif_linux_flow *request)
{
    static struct nlattr dummy_action;

    struct dpif_linux *dpif = dpif_linux_cast(dpif_);

    dpif_linux_flow_init(request);
    request->cmd = (put->flags & DPIF_FP_CREATE
                    ? OVS_FLOW_CMD_NEW : OVS_FLOW_CMD_SET);
    request->dp_ifindex = dpif->dp_ifindex;
    request->key = put->key;
    request->key_len = put->key_len;
    /* Ensure that OVS_FLOW_ATTR_ACTIONS will always be included. */
    request->actions = put->actions ? put->actions : &dummy_action;
    request->actions_len = put->actions_len;
    if (put->flags & DPIF_FP_ZERO_STATS) {
        request->clear = true;
    }
    request->nlmsg_flags = put->flags & DPIF_FP_MODIFY ? 0 : NLM_F_CREATE;
}
dpif_linux_flow_transact()则具体负责发出nlmsg,代码为
static int
dpif_linux_flow_transact(struct dpif_linux_flow *request,
                         struct dpif_linux_flow *reply, struct ofpbuf **bufp)
{
    struct ofpbuf *request_buf;
    int error;

    assert((reply != NULL) == (bufp != NULL));

    if (reply) {
        request->nlmsg_flags |= NLM_F_ECHO;
    }

    request_buf = ofpbuf_new(1024);
    dpif_linux_flow_to_ofpbuf(request, request_buf);
    error = nl_sock_transact(genl_sock, request_buf, bufp);
    ofpbuf_delete(request_buf);

    if (reply) {
        if (!error) {
            error = dpif_linux_flow_from_ofpbuf(reply, *bufp);
        }
        if (error) {
            dpif_linux_flow_init(reply);
            ofpbuf_delete(*bufp);
            *bufp = NULL;
        }
    }
    return error;
}
dpif_linux_flow_transact()的第一个参数request为发送消息的相关数据,第二个参数和第三个参数如果给定非空,则用来存储可能收到的回复信息。
该函数首先调用dpif_linux_flow_to_ofpbuf()函数,利用request中的attrs信息创建一个struct ofpbuf *request_buf=ovs_header+attrs。其中ovs_header结构(include/linux/openvswitch.h)中仅保存了一个dp_ifindex信息。之后则调用nl_sock_transact()函数发出消息。
nl_sock_transact()函数代码为
int
nl_sock_transact(struct nl_sock *sock, const struct ofpbuf *request,
                 struct ofpbuf **replyp)
{
    struct nl_transaction *transactionp;
    struct nl_transaction transaction;

    transaction.request = CONST_CAST(struct ofpbuf *, request);
    transaction.reply = replyp ? ofpbuf_new(1024) : NULL;
    transactionp = &transaction;

    nl_sock_transact_multiple(sock, &transactionp, 1);

    if (replyp) {
        if (transaction.error) {
            ofpbuf_delete(transaction.reply);
            *replyp = NULL;
        } else {
            *replyp = transaction.reply;
        }
    }

    return transaction.error;
}
nl_sock_transact()函数进一步调用了nl_sock_transact_multiple()函数发出消息。
nl_sock_transact_multiple()函数第一个参数为要发送消息的socket,第二个参数存储了需要发送的消息,第三个参数指定发出消息的数目。函数中主要调用了nl_sock_transact_multiple__()函数构建nlmsg并发出。
其他类型的行为的处理过程类似。

Monday, November 26, 2012

OpenvSwitch 代码分析(三)

本小节简要分析datapath收到网包,到跟ovsd进行交互的过程。


动态过程

datapath收到网包

dp_init中通过调用dp_register_genl()注册了对于dp,vport,flow,packet四种类型事件的netlink familyops
当内核中的openvswitch.ko收到一个添加网桥的指令时候,即接收到OVS_DATAPATH_FAMILY通道的OVS_DP_CMD_NEW命令。该命令绑定的回调函数为ovs_dp_cmd_new。相关实现在datapath/datapath.c文件中,关键代码如下:
static struct genl_ops dp_datapath_genl_ops[] = {
   { .cmd = OVS_DP_CMD_NEW,
     .flags = GENL_ADMIN_PERM, /* Requires CAP_NET_ADMIN privilege. */
     .policy = datapath_policy,
     .doit = ovs_dp_cmd_new
   },
   { .cmd = OVS_DP_CMD_DEL,
     .flags = GENL_ADMIN_PERM, /* Requires CAP_NET_ADMIN privilege. */
     .policy = datapath_policy,
     .doit = ovs_dp_cmd_del
   },
   { .cmd = OVS_DP_CMD_GET,
     .flags = 0,                               /* OK for unprivileged users. */
     .policy = datapath_policy,
     .doit = ovs_dp_cmd_get,
     .dumpit = ovs_dp_cmd_dump
   },
   { .cmd = OVS_DP_CMD_SET,
     .flags = GENL_ADMIN_PERM, /* Requires CAP_NET_ADMIN privilege. */
     .policy = datapath_policy,
     .doit = ovs_dp_cmd_set,
   },
};
ovs_dp_cmd_new函数除了初始化dp结构外,还调用new_vport()函数来生成一个新的vport。而new_port函数则调用ovs_vport_add()函数,来尝试生成一个新的vport。关键代码如下:
static struct vport *new_vport(const struct vport_parms *parms)
{
   struct vport *vport;

   vport = ovs_vport_add(parms);
   if (!IS_ERR(vport)) {
                   struct datapath *dp = parms->dp;
                   struct hlist_head *head = vport_hash_bucket(dp, vport->port_no);

                   hlist_add_head_rcu(&vport->dp_hash_node, head);
                   dp_ifinfo_notify(RTM_NEWLINK, vport);
   }
   return vport;
}
ovs_vport_add()函数会检查vport类型,并调用相关的create()函数来生成vport结构。关键代码为
struct vport *ovs_vport_add(const struct vport_parms *parms)
{
   struct vport *vport;
   int err = 0;
   int i;

   ASSERT_RTNL();

   for (i = 0; i < n_vport_types; i++) {
                   if (vport_ops_list[i]->type == parms->type) {
                                   struct hlist_head *bucket;

                                   vport = vport_ops_list[i]->create(parms);
                                   if (IS_ERR(vport)) {
                                                   err = PTR_ERR(vport);
                                                   goto out;
                                   }

                                   bucket = hash_bucket(ovs_dp_get_net(vport->dp),
                                                                        vport->ops->get_name(vport));
                                   hlist_add_head_rcu(&vport->hash_node, bucket);
                                   return vport;
                   }
   }

   err = -EAFNOSUPPORT;

out:
   return ERR_PTR(err);
}
其中vport_ops_list[]ovs_vport_init()的初始化过程中,被定义为与base_vport_ops_list相同。关键代码如下
int ovs_vport_init(void)
{
   int err;
   int i;

   dev_table = kzalloc(VPORT_HASH_BUCKETS * sizeof(struct hlist_head),
                                       GFP_KERNEL);
   if (!dev_table) {
                   err = -ENOMEM;
                   goto error;
   }

   vport_ops_list = kmalloc(ARRAY_SIZE(base_vport_ops_list) *
                                                    sizeof(struct vport_ops *), GFP_KERNEL);
   if (!vport_ops_list) {
                   err = -ENOMEM;
                   goto error_dev_table;
   }

    /* create a vport_ops_list, templated from base_vport_ops_list.*/
   for (i = 0; i < ARRAY_SIZE(base_vport_ops_list); i++) {
                   const struct vport_ops *new_ops = base_vport_ops_list[i]; //check each vport_ops instance

                   if (new_ops->init)
                                   err = new_ops->init(); //init each vport_ops
                   else
                                   err = 0;

                   if (!err)
                                   vport_ops_list[n_vport_types++] = new_ops;
                   else if (new_ops->flags & VPORT_F_REQUIRED) {
                                   ovs_vport_exit();
                                   goto error;
                   }
   }

   return 0;

error_dev_table:
   kfree(dev_table);
error:
   return err;
}
base_vport_ops_list[]变量的成员目前有5种,分别为
/* List of statically compiled vport implementations.  Don't forget to also
 * add yours to the list at the bottom of vport.h. */
static const struct vport_ops *base_vport_ops_list[] = {
   &ovs_netdev_vport_ops, //netdev instance
   &ovs_internal_vport_ops,
   &ovs_patch_vport_ops,
   &ovs_gre_vport_ops,
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,26)
   &ovs_capwap_vport_ops,
#endif
};
因此,当vport定义为网络类型时,会执行ovs_netdev_vport_ops中定义的相关函数,包括initcreate等,函数列表如下:
const struct vport_ops ovs_netdev_vport_ops = {
   .type                     = OVS_VPORT_TYPE_NETDEV,
   .flags          = VPORT_F_REQUIRED,
   .init                        = netdev_init,
   .exit                       = netdev_exit,
   .create                  = netdev_create,
   .destroy               = netdev_destroy,
   .set_addr             = ovs_netdev_set_addr,
   .get_name          = ovs_netdev_get_name,
   .get_addr            = ovs_netdev_get_addr,
   .get_kobj             = ovs_netdev_get_kobj,
   .get_dev_flags  = ovs_netdev_get_dev_flags,
   .is_running         = ovs_netdev_is_running,
   .get_operstate  = ovs_netdev_get_operstate,
   .get_ifindex       = ovs_netdev_get_ifindex,
   .get_mtu             = ovs_netdev_get_mtu,
   .send                     = netdev_send,
};
可见,当dp是网络设备时(vport-netdev.c),最终由ovs_vport_add()函数调用的是netdev_create()函数,而netdev_create()函数中最关键的一步是注册了收到网包时的回调函数。
err = netdev_rx_handler_register(netdev_vport->dev, netdev_frame_hook, vport);
该操作将netdev_vport->dev收到网包时的相关数据由, netdev_frame_hook()函数来处理。后面都是进行一些辅助处理后,依次调用各处理函数,中间从ovs_vport_receive()回到vport.c,从ovs_dp_process_received_packet()回到datapath.c,进行统一处理。
netdev_frame_hook()ànetdev_port_receive()àovs_vport_receive()àovs_dp_process_received_packet()
ovs_dp_process_received_packet()datapath/datapath.c)中进行复杂的包处理过程,进行流查表,查表后执行对应的行为。当查找失败时候,使用ovs_dp_upcall()发送upcall到用户空间(ovs-vswitchd)。此后处理过程交给ovsd处理。
ovs_dp_process_received_packet()àovs_dp_upcall()à queue_userspace_packet()

ovs-vswitchd 快速处理

ovs-vswitchd利用bridge_run()/bridge_fast_run()vswitchd/bridge.c)不断轮询各个bridge,执行相应的操作。
主要通过调用ofproto_run_fast()ofproto_run()来运行各个bridge上的ofproto的处理过程。其中run_fast()函数简化了一些不必要的处理,主要处理upcall,运行速度更快。

bridge_run_fast()

下面我们首先以run_fast()函数为例进行分析。
void
bridge_run_fast(void)
{
    struct bridge *br;

    HMAP_FOR_EACH (br, node, &all_bridges) {
        ofproto_run_fast(br->ofproto);
    }
}
ofproto_run_fast()调用struct ofproto_class {}中对应的run_fast()。其中struct ofproto_classofproto/ofproto-provider.h)是j个抽象类。run_fast()是个函数指针,实际上,在bridge_run()中进行了赋值。
可能的ofproto_class类型在ofproto_classes[]变量中声明。而ofproto_classes[]变量是通过ofproto_initialize()来进行初始化的。在ofproto/ofproto.c中有如下的代码
static void
ofproto_initialize(void)
{
    static bool inited;

    if (!inited) {
        inited = true;
        ofproto_class_register(&ofproto_dpif_class);
    }
}
其中,ofproto_class_register()定义如下。
int
ofproto_class_register(const struct ofproto_class *new_class)
{
    size_t i;

    for (i = 0; i < n_ofproto_classes; i++) {
        if (ofproto_classes[i] == new_class) {
            return EEXIST;
        }
    }

    if (n_ofproto_classes >= allocated_ofproto_classes) {
        ofproto_classes = x2nrealloc(ofproto_classes,
                                     &allocated_ofproto_classes,
                                     sizeof *ofproto_classes);
    }
    ofproto_classes[n_ofproto_classes++] = new_class;
    return 0;
}
可见,ofproto_classes在初始化ofproto_initialize()(该初始化函数多次被调用,但仅执行一次)后仅含有一个变量,即ofproto_dpif_class,而ofproto_dpif_class的定义在ofproto/ofproto-dpif.c中,声明了各个变量和操作函数,如下
const struct ofproto_class ofproto_dpif_class = {
    enumerate_types,
    enumerate_names,
    del,
    alloc,
    construct,
    destruct,
    dealloc,
    run,
    run_fast,
    wait,
    get_memory_usage,
    flush,
    get_features,
    get_tables,
    port_alloc,
    port_construct,
    port_destruct,
    port_dealloc,
    port_modified,
    port_reconfigured,
    port_query_by_name,
    port_add,
    port_del,
    port_get_stats,
    port_dump_start,
    port_dump_next,
    port_dump_done,
    port_poll,
    port_poll_wait,
    port_is_lacp_current,
    NULL,                       /* rule_choose_table */
    rule_alloc,
    rule_construct,
    rule_destruct,
    rule_dealloc,
    rule_get_stats,
    rule_execute,
    rule_modify_actions,
    set_frag_handling,
    packet_out,
    set_netflow,
    get_netflow_ids,
    set_sflow,
    set_cfm,
    get_cfm_fault,
    get_cfm_opup,
    get_cfm_remote_mpids,
    get_cfm_health,
    set_stp,
    get_stp_status,
    set_stp_port,
    get_stp_port_status,
    set_queues,
    bundle_set,
    bundle_remove,
    mirror_set,
    mirror_get_stats,
    set_flood_vlans,
    is_mirror_output_bundle,
    forward_bpdu_changed,
    set_mac_idle_time,
    set_realdev,
};
Ofproto_class的初始化多次被调用,一个可能的流程如下:
bridge_run()à bridge_reconfigure()à bridge_update_ofprotos()à  ofproto_create()àofproto_initialize()
除了这个过程以外,在ofproto_class_find__()中也都调用了ofproto_initialize()
因此,ofproto_class中的成员的函数指针实际上指向了ofproto_dpif_class中的各个函数。

run_fast()

我们来看ofproto_dpif_class中的run_fast(struct ofproto *ofproto)ofproto/ofproto-dpif.c)的代码。
static int
run_fast(struct ofproto *ofproto_)
{
    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
    struct ofport_dpif *ofport;
    unsigned int work;

    HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) {
        port_run_fast(ofport);
    }

    /* Handle one or more batches of upcalls, until there's nothing left to do
     * or until we do a fixed total amount of work.
     *
     * We do work in batches because it can be much cheaper to set up a number
     * of flows and fire off their patches all at once.  We do multiple batches
     * because in some cases handling a packet can cause another packet to be
     * queued almost immediately as part of the return flow.  Both
     * optimizations can make major improvements on some benchmarks and
     * presumably for real traffic as well. */
    work = 0;
    while (work < FLOW_MISS_MAX_BATCH) {
        int retval = handle_upcalls(ofproto, FLOW_MISS_MAX_BATCH - work);
        if (retval <= 0) {
            return -retval;
        }
        work += retval;
    }
    return 0;
}
实际上主要是做了对handle_upcalls()的调用。这也可以理解,因为ovs-vswitchd在各个bridge上很重要的一个操作就是要监听和处理来自各个bridge上的请求。

handle_upcalls()

代码如下:
static int
handle_upcalls(struct ofproto_dpif *ofproto, unsigned int max_batch)
{
    struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
    struct ofpbuf miss_bufs[FLOW_MISS_MAX_BATCH];
    uint64_t miss_buf_stubs[FLOW_MISS_MAX_BATCH][4096 / 8];
    int n_processed;
    int n_misses;
    int i;

    assert(max_batch <= FLOW_MISS_MAX_BATCH);

    n_misses = 0;
    for (n_processed = 0; n_processed < max_batch; n_processed++) {
        struct dpif_upcall *upcall = &misses[n_misses];
        struct ofpbuf *buf = &miss_bufs[n_misses];
        int error;

        ofpbuf_use_stub(buf, miss_buf_stubs[n_misses],
                        sizeof miss_buf_stubs[n_misses]);
        error = dpif_recv(ofproto->dpif, upcall, buf);
        if (error) {
            ofpbuf_uninit(buf);
            break;
        }

        switch (classify_upcall(upcall)) {
        case MISS_UPCALL:
            /* Handle it later. */
            n_misses++;
            break;

        case SFLOW_UPCALL:
            if (ofproto->sflow) {
                handle_sflow_upcall(ofproto, upcall);
            }
            ofpbuf_uninit(buf);
            break;

        case BAD_UPCALL:
            ofpbuf_uninit(buf);
            break;
        }
    }

    /* Handle deferred MISS_UPCALL processing. */
    handle_miss_upcalls(ofproto, misses, n_misses);
    for (i = 0; i < n_misses; i++) {
        ofpbuf_uninit(&miss_bufs[i]);
    }

    return n_processed;
}
在这部分代码中,完成对相关的upcall的处理。包括datapath找不到流表时的MISS_UPCALLSFLOW相关流量处理等。对流表MISS_UPCALL部分的处理主要在handle_miss_upcalls()ofproto/ofproto-dpif.c)。其中,handle_miss_upcalls()的主要过程为执行handle_flow_miss()dpif_operate()handle_flow_miss()负责查找出对upcall的对应行动,后者根据行动执行操作。
HMAP_FOR_EACH (miss, hmap_node, &todo) {
        handle_flow_miss(ofproto, miss, flow_miss_ops, &n_ops);
    }
    assert(n_ops <= ARRAY_SIZE(flow_miss_ops));

    /* Execute batch. */
    for (i = 0; i < n_ops; i++) {
        dpif_ops[i] = &flow_miss_ops[i].dpif_op;
    }
    dpif_operate(ofproto->dpif, dpif_ops, n_ops);

handle_flow_miss()

位于ofproto/ofproto-dpif.c,主要过程如下
static void
handle_flow_miss(struct ofproto_dpif *ofproto, struct flow_miss *miss,
                 struct flow_miss_op *ops, size_t *n_ops)
{
    struct facet *facet;
    uint32_t hash;

    /* The caller must ensure that miss->hmap_node.hash contains
     * flow_hash(miss->flow, 0). */
    hash = miss->hmap_node.hash;

    facet = facet_lookup_valid(ofproto, &miss->flow, hash);
    if (!facet) {
        struct rule_dpif *rule = rule_dpif_lookup(ofproto, &miss->flow);

        if (!flow_miss_should_make_facet(ofproto, miss, hash)) {
            handle_flow_miss_without_facet(miss, rule, ops, n_ops);
            return;
        }

        facet = facet_create(rule, &miss->flow, hash);
    }
    handle_flow_miss_with_facet(miss, facet, ops, n_ops);
}
该过程主要包括两部分,首先是在本地通过facet_lookup_valid()函数查找流表,看是否有与flow精确匹配的规则。
如果不存在facet,则通过rule_dpif_lookup()函数来查找匹配的规则,并利用flow_miss_should_make_facet ()测试是否值得在ovsdofproto中添加相应规则并写到datapath中(多数情况下)。如果为是,handle_flow_miss_without_facet()将结果rule写入到ops或者利用facet_create()ofproto内添加新的facet
如果存在facet,则利用handle_flow_miss_with_facet()更新ops。其中,handle_flow_miss_with_facet()调用handle_flow_miss_common()进行状态测试:如果在fail mod,则发送miss消息(ofproto/ofproto-dpif.c文件中,调用send_packet_in_miss()àconnmgr_send_packet_in())给controller。之后检查是否创建slow flow的标记等。
rule_dpif_lookup()为例,该函数进一步的调用rule_dpif_lookup__()函数。其代码为
static struct rule_dpif *
rule_dpif_lookup__(struct ofproto_dpif *ofproto, const struct flow *flow,
                   uint8_t table_id)
{
    struct cls_rule *cls_rule;
    struct classifier *cls;

    if (table_id >= N_TABLES) {
        return NULL;
    }

    cls = &ofproto->up.tables[table_id].cls;
    if (flow->nw_frag & FLOW_NW_FRAG_ANY
        && ofproto->up.frag_handling == OFPC_FRAG_NORMAL) {
        /* For OFPC_NORMAL frag_handling, we must pretend that transport ports
         * are unavailable. */
        struct flow ofpc_normal_flow = *flow;
        ofpc_normal_flow.tp_src = htons(0);
        ofpc_normal_flow.tp_dst = htons(0);
        cls_rule = classifier_lookup(cls, &ofpc_normal_flow);
    } else {
        cls_rule = classifier_lookup(cls, flow);
    }
    return rule_dpif_cast(rule_from_cls_rule(cls_rule));
}
其中,classifier_lookup()通过对ofproto中存储的规则表进行查找,找到最高优先级的匹配规则,并返回该规则。

dpif_operate()

dpif_operate(ofproto->dpif, dpif_ops, n_ops)主要根据handle_flow_miss()后确定的行动,执行相关的操作。
主要代码如下:
/* Execute batch. */
    for (i = 0; i < n_ops; i++) {
        dpif_ops[i] = &flow_miss_ops[i].dpif_op;
    }
    dpif_operate(ofproto->dpif, dpif_ops, n_ops);
其中dpif_operate()首先判断具体的dpif_class中是否存在operate()函数,如果有则调用执行。否则就根据op的类型调用具体的dpif_class中的flow_put()flow_del()、或execute()函数。
operate()函数存在
先分析存在operate()函数的情况。
dpif->dpif_class->operate(dpif, ops, n_ops);
operate()需要根据具体的类型来定。dpif_class类型仍然在dpif.clib/dpif.c)中通过base_dpif_classes[]来声明。
static const struct dpif_class *base_dpif_classes[] = {
#ifdef HAVE_NETLINK
    &dpif_linux_class,
#endif
    &dpif_netdev_class,
};
其中dpif_linux_class通过netlink跟本地的datapath通信,而dpif_netdev_class则意味着通过网络协议跟远程的datapath通信。此处分析以常见的dpif-linux为例,其operate()实际为dpif_linux_operate()lib/dpif-linux.c)。
dpif_linux_operate()实际上主要就是调用了dpif_linux_operate__(),在dpif_linux_operate__()中,首先利用传入的dpif_op **ops对不同类型(PUTDELEXECUTE)的行动分别创建相应的多个aux->request消息,之后调用nl_sock_transact_multiple()函数(lib/netlink-socket.c)来发出request,并收回reply。代码为
    nl_sock_transact_multiple(genl_sock, txnsp, n_ops);
注意,txnsp中同时包括发出的request和收回的reply
之后,检查reply函数,更新相关的统计变量。
operate()函数不存在
如果对于某个具体的dpif_class,并没有提供operate(),则需要分别处理不同类型的行为,代码为
for (i = 0; i < n_ops; i++) {
        struct dpif_op *op = ops[i];

        switch (op->type) {
        case DPIF_OP_FLOW_PUT:
            op->error = dpif_flow_put__(dpif, &op->u.flow_put);
            break;

        case DPIF_OP_FLOW_DEL:
            op->error = dpif_flow_del__(dpif, &op->u.flow_del);
            break;

        case DPIF_OP_EXECUTE:
            op->error = dpif_execute__(dpif, &op->u.execute);
            break;

        default:
            NOT_REACHED();
        }
可以处理DPIF_OP_FLOW_PUTDPIF_OP_FLOW_DELDPIF_OP_EXECUTE三种类型的情况(均在lib/dpif.h中定义)。
enum dpif_op_type {
    DPIF_OP_FLOW_PUT = 1,
    DPIF_OP_FLOW_DEL,
    DPIF_OP_EXECUTE,
};
值得注意的是第三种情况,DPIF_OP_EXECUTE,需要将执行命令发回datapathdpif_execute__()调用了dpif结构中的dpif_class抽象类型中的execute()函数。
仍然以dpif-linuxlib/dpif-linux.c)为例,定义为
const struct dpif_class dpif_linux_class = {
    "system",
    dpif_linux_enumerate,
    dpif_linux_open,
    dpif_linux_close,
    dpif_linux_destroy,
    dpif_linux_run,
    dpif_linux_wait,
    dpif_linux_get_stats,
    dpif_linux_port_add,
    dpif_linux_port_del,
    dpif_linux_port_query_by_number,
    dpif_linux_port_query_by_name,
    dpif_linux_get_max_ports,
    dpif_linux_port_get_pid,
    dpif_linux_port_dump_start,
    dpif_linux_port_dump_next,
    dpif_linux_port_dump_done,
    dpif_linux_port_poll,
    dpif_linux_port_poll_wait,
    dpif_linux_flow_get,
    dpif_linux_flow_put,
    dpif_linux_flow_del,
    dpif_linux_flow_flush,
    dpif_linux_flow_dump_start,
    dpif_linux_flow_dump_next,
    dpif_linux_flow_dump_done,
    dpif_linux_execute,
    dpif_linux_operate,
    dpif_linux_recv_set,
    dpif_linux_queue_to_priority,
    dpif_linux_recv,
    dpif_linux_recv_wait,
    dpif_linux_recv_purge,
};
所以,执行函数的为dpif_linux_execute()lib/dpif-linux.c),该函数首先调用的是dpif_linux_execute__()函数。
static int
dpif_linux_execute__(int dp_ifindex, const struct dpif_execute *execute)
{
    uint64_t request_stub[1024 / 8];
    struct ofpbuf request;
    int error;

    ofpbuf_use_stub(&request, request_stub, sizeof request_stub);
    dpif_linux_encode_execute(dp_ifindex, execute, &request);
    error = nl_sock_transact(genl_sock, &request, NULL);
    ofpbuf_uninit(&request);

    return error;
}
该函数创建一个OVS_PACKET_CMD_EXECUTE类型的nlmsg,并利用nl_sock_transact()将它发出给datapath

ovs-vswitchd完整处理

主要通过调用bridge_run()来完成完整的配置和处理。
该函数执行完整的bridge的操作,包括对of命令的操作和网桥的维护,数据库信息同步、维护到controller的连接等。之后调用ofproto_run()处理相关的ofproto消息(参考3.2.17)。
最后判断是否到了周期log的时间,进行log记录(但默认的周期为LLONG_MAX),并刷新各个连接和状态信息等。