EVPath
The EVdfg interface

EVdfg interface

EVdfg is designed to be a relatively thin layer on top of EVPath that enables whole-system multi-process deployment of Data Flow Graphs implemented with EVPath stones. Like EVPath, EVdfg has abstractions to create stones, to assign actions to them, to link stones to each other and to create sources and sinks of data. However, in EVdfg the stones created are a virtual representation of the actual data flow graph that is to be realized in EVPath, with EVdfg controlling the mapping (and potentially remapping) of EVPath stones to the participating processes (nodes).

Unlike EVpath, EVdfg has knowledge of the entire set of processes that make up the 'cohort'. In particular, in EVdfg a single, differentiated 'Master' process is responsible for defining the entire DFG. Code in the master creates the virtual stones, assigns actions to those stones, links them together to create the topology, and assigns stones to nodes. It also determines when sufficient 'client' processes have joined the 'cohort', either by specifying a static list, or by making a decision in a handler as each new client process joins. Once all required nodes have joined and the virtual DFG has been created, it is realized by the master (I.E. actual EVPath stones are created at the appropriate places) and starts running. The master may also register handlers that can be called in the event of new nodes joining, an existing node becoming unreachable, or a stone calling for reconfiguration. In the case of each of these, the master may rearrange the set of virtual stones, moving some, adding others, changing links, etc. and then can realize the changed DFG. The master can also (optionally) arbitrate an orderly shutdown of all client processes with an agreed-upon exit value.

Generally, but not necessarily, the master process is also a client and a participant in the cohort. The other participating processes are relatively passive. They identify their capabilities to the master, but do not directly contribute stones to the DFG instead accepting the stones that are assigned to them.

The Joining Process

The subroutine EVclient_assoc() is used to associate a client with a master. A similar routine, EVclient_assoc_local() is used by the the master process to initiate client-like participation in the cohort it controls. The goal of the joining process is for EVdfg to know when all participating processes have joined and to have a unique name for each process (string node names are used for stone assignment).

Static Joining mode

In the simplest form of the join, called `static joining' mode, the following things occur:

In this static mode, the pseudocode for the master looks like this:

EVmaster test_master;
char *str_contact;
char *nodes[] = {"a", "b", NULL};
/* CM initialization occurs here */
test_master = EVmaster_create(cm);
str_contact = EVmaster_get_contact_list(test_master);
EVmaster_register_node_list(test_master, &nodes[0]);
/* EVclient capability registration occurs here */
/* EVdfg stone creation and assignment occurs here */
EVdfg_realize(test_dfg);
/* We're node "a" in the DFG */
test_client = EVclient_assoc_local(cm, "a", master, source_capabilities, sink_capabilities););
/* Somehow provide str_contact to the other participant(s) */
/* Wait until everyone has joined */
EVclient_ready_wait(test_client);

While the client is somewhat simpler:

EVclient test_client;
char *str_contact;
/* CM initialization and EVclient capability registration occurs here */
/* We're node "b" in the cohort */
test_client = EVclient_assoc(test_client, "b", str_contact, source_capabilities, sink_capabilities);
/* Wait until everyone has joined */
EVclient_ready_wait(test_client);

For the moment, please ignore the 'source_capabilities' and 'sink_capabilities' parameters to EVclient_assoc() and EVclient_assoc_local(). Those will be explained below.

Dynamic Joining mode

There are situations in which the "static joining" scenario described above is too rigid. 'Dynamic Joining' mode relaxes several requirements above. First, rather than using a static node list (via EVmaster_register_node_list()) and creating the entire virtual DFG before all the nodes join, the master registers a EVmasterJoinHandlerFunc which is called each time a client joins. This join function may use its own criterion in deciding when sufficient clients have joined. When it decides that enough have joined, then it should create the virtual DFG and call EVdfg_realize(). In this situation, the requirement that each client process specify a unique name to EVclient_assoc() is also relaxed. Instead, each client can specify a generic name (like "client") and the join handler can associate a unique canonical name with that client using EVmaster_assign_canonical_name(). In this case, the canonical names assigned must be unique and are the names used in EVdfg_assign_node().

In dynamic mode, the pseudocode for the client is unchanged (except that the specified name need not be unique). But the code structure for the master is changes:

EVmaster test_master;
char *str_contact;
/* CM initialization and EVclient capability registration occurs here */
test_master = EVmaster_create(cm);
str_contact = EVmaster_get_contact_list(test_master);
EVmaster_node_join_handler (test_master, join_handler_func);
/* We're node "a" in the set of participating nodes */
test_client = EVclient_assoc_local(cm, "client", test_master, source_capabilities, sink_capabilities););
/* Somehow provide str_contact to the other participant(s) */
/* Wait until everyone has joined and a DFG has been deployed */
EVclient_ready_wait(test_client);

The join handler function in dynamic mode will generally follow this form:

void
join_handler_func(EVmaster master, char *identifier, void*cur_unused1, void*cur_unused2)
{
char *canon_name;
/* Generate a canonical name for this client in canon_name, then assign */
EVmaster_assign_canonical_name(master, identifier, canon_name);
if ( /* not ready to start running */) return;
/* EVdfg stone creation and assignment occurs here */
/*
* upon return, the DFG will be instantiated and all clients fall out
* of EVclient_ready_wait();
*/
}

The join handler function is also used for dynamic reconfiguration of the DFG, described later.

EVclient Capability Registration

The phrase "EVclient capability registration" occurs in the pseudocode comments above. Due to the dynamic code generation and FFS-based data representation in EVPath, many stones can be placed on any process, without regard to architecture, OS or other considerations. However data sources and data sinks necessarily require the cooperation of the receiving process. In particular, for an actual EVpath EVsource handle, the process must itself call EVsubmit() on that handle. For a sink (an EVPath terminal action, where data is delivered to a native subroutine), EVdfg must know the jump address for that subroutine and the exact format of data that it expects.

In order to preserve flexibility for the master to map the DFG to the nodes as it desires, sources and sink handlers are presented as 'capabilities' in EVdfg. That is, sources are created by clients and made available for use, but the master may or may not actually use them. Similarly, the jump addresses and data expectations of handler subroutines are associated with "handler names" in the client. Stones with sink actions associated with those handler names may or may not be assigned individual clients.

EVdfg Source Capabilities

EVdfg clients register source capabilities by first creating an EVPath source handle as in normal EVPath, but specifying the value 'DFG_SOURCE' as the target stone ID. Then the EVdfg routine EVclient_register_source() is used to associate a name with the source handle. Unlike EVPath, where EVsource handles are associated with particular local stones, but are not considered stones themselves, EVdfg sources are full virtual stones, assignable to different clients like other virtual stones. The master creates them with EVdfg_create_source_stone(), specifying a source name. This source name must match a registered source on the client to which the source stone is assigned. Source names should be unique within each client, no client EVsource handle can have more than one EVdfg_stone associated with it. The return value from EVclient_register_source() is of type EVclient_sources and represents a handle to the source capabilities of this client. The value is adjusted each time EVclient_register_source() is called. All calls to EVclient_register_source() should happen before the call to EVclient_assoc() or EVclient_assoc_local() and the return value of the last call to EVclient_register_source() should be passed to EVclient_assoc() or EVclient_assoc_local() as the source_capabilities parameter. (The return values of prior calls can be ignored.)

EVdfg Sink Capabilities

EVdfg clients register sink (terminal handler) capabilities by using the routine EVclient_register_sink_handler(). Essentially, this establishes an association between an particular handler subroutine, the FMStructDescList describing the data it expects, and a handler name. The master creates sink stones with EVdfg_create_sink_stone(), or associates a new sink action with an existing stone with EVdfg_add_sink_action(). In each case, the handler_name specified in the call must match one that has appeared in a EVclient_register_sink_handler() call on the client to which that sink stone is assigned. Unlike sources, there is no restriction on how many different sink stones may be associated with the same sink handler. The return value from EVclient_register_sink_handler() is of type EVclient_sinks and represents a handle to the sink capabilities of this client. The value is adjusted each time EVclient_register_sink_handler() is called. All calls to EVclient_register_sink_handler() should happen before the call to EVclient_assoc() or EVclient_assoc_local() and the return value of the last call to EVclient_register_sink_handler() should be passed to EVclient_assoc() or EVclient_assoc_local() as the sink_capabilities parameter. (The return values of prior calls can be ignored.)

Run-time Semantics

It is sometimes useful to clients to know, after a DFG has been realized, if a particular local source has had an EVdfg source stone associated with it. The routine EVclient_source_active(EVsource src) returns a boolean value on this condition (True if associated, False if not). During DFG run-time, clients usually test if a source is active so that they know if data is to be submitted to it, as in the code below:

if (EVclient_source_active(source_handle)) {
simple_rec rec;
generate_simple_record(&rec);
/* submit would be quietly ignored if source is not active */
EVsubmit(source_handle, &rec, NULL);
}

As the code comments indicate, a submit to an inactive source would be quietly ignored by EVPath, but it's better form to test first.

For local sinks, that the actual handler gets called when data arrives is the only real indication that a source handle has been utilized. However, a count of the total sink stones assigned to a particular node is available via the routine EVclient_active_sink_count(EVclient client). This is sometimes useful for clients who need to know if they should be waiting for particular data before signalling for shutdown, or if they are just waiting patiently for the other nodes to complete operation.

Capability Example

The code below is a sample for a client that is capable of being a source for "simple" data, or for hosting terminal stones that handle "simple" data.

source_handle = EVcreate_submit_handle(cm, -1, simple_format_list);
source_capabilities = EVclient_register_source("data_source", source_handle);
sink_capabilities = EVdfg_register_sink_handler(cm, "simple_handler", simple_format_list,
(EVSimpleHandlerFunc) simple_handler, NULL);

As noted previously, the source_capabilies and sink_capabilities are of types EVclient_sources and EVclient_sinks, respectively, and the last return value of each should be passed to EVassoc_client() or EVassoc_client_local(). These capabilities represent what the client is capable* of doing. What it will actually do depends upon what resources are assigned to it by the master.

EVdfg Stone Creation and Assignment

In EVdfg, DFG stone creation occurs much like it does in raw EVPath with a few major differences:

These are the basic DFG creation functions:

Function

Purpose

EVdfg_create_stone() Create a stone with a specified action
EVdfg_add_action() Add an action to an exiting stone
EVdfg_create_source_stone() Create a stone to source events
EVdfg_create_sink_stone() Create a stone to be a sink for events
EVdfg_add_sink_action() Add a sink action to an existing stone
EVdfg_link_dest() Connect the next available output port on one stone to deliver events to another
EVdfg_link_port() Connect the numeric output port on one stone to deliver events to another
EVdfg_assign_node() Assign an EVdfg_stone to be instantiated at a particular client node

Examples

Two node - Two stone example

These functions, together with those described previously are enough to create a couple of simple example programs: dfg_master.c dfg_client.c

In order to run these, first run 'dfg_master' and you should see output like this:

bash-3.2$ ./dfg_master
Contact list is "AAIBAClXsP6Cz2UZDW9pXpllAADViN+OAwAAAAUAZW5ldAAA"

The text within the quotation marks is the contact string for the DFG. Copy it and use it as the first argument to dfg_client, like this:

bash-3.2$ ./dfg_client "b" "AAIBAClXsP6Cz2UZDW9pXpllAADViN+OAwAAAAUAZW5ldAAA"

On the master side, you should see the output "I got 318".

Some things to note about these examples:

Three node - Three stone example

In order to change the above two node example to something that covers three nodes, we only need modify the nodes array:

char *nodes[] = {"a", "b", "c", NULL};

And change the DFG creation section so that it creates three stones for us to assign:

src = EVdfg_create_source_stone(test_dfg, "event source");
EVdfg_assign_node(src, "a");
mid = EVdfg_create_stone(test_dfg, NULL);
EVdfg_assign_node(mid, "b");
EVdfg_link_port(src, 0, mid);
sink = EVdfg_create_sink_stone(test_dfg, "simple_handler");
EVdfg_assign_node(sink, "c");
EVdfg_link_port(mid, 0, sink);
EVdfg_realize(test_dfg);

This example, dfg_master2.c is run like the first, but with two clients:

bash-3.2$ ./dfg_master2
Contact list is "AAIBAClXsP7AqAF3DW9pXsplAADViN+OAwAAAAUAZW5ldAAA"
bash-3.2$ ./dfg_client "b" "AAIBAClXsP7AqAF3DW9pXsplAADViN+OAwAAAAUAZW5ldAAA" &
bash-3.2$ ./dfg_client "c" "AAIBAClXsP7AqAF3DW9pXsplAADViN+OAwAAAAUAZW5ldAAA"

Some things to note about this example:

EVdfg Termination

In the previous examples, the final call is CMrun_network(cm), which essentially tells EVPath to handle incoming network messages forever. However, while not necessarily suitable for all circumstances, EVdfg does contain a facility for coordinating shutdown between participating EVdfg processes, and even providing them with a common value to pass to exit(). The facility was designed expressly to support the EVdfg regression tests, but it finds use in other circumstances.

The EVdfg shutdown facility generally requires a contribution from all participating nodes before it authorizes shutdown, and if any node indicates a Failure (non-zero) status the all nodes should indicate Failure. However, what type of contribution is appropriate from each node depends upon its circumstances. For example, consider the three node example above. If it were a regression test, it should be considered successful only if the event is submitted and received at the destination with the expected value. For the node hosting the source, it is ready for shutdown only after it has submitted its event, and if it reaches that point then it considers its participation a success. For the node hosting the sink, it is ready for shutdown only after it has the event, and then it considers the test a success or failure depending upon the value of received data. The node hosting neither the source nor sink is a bit trickier. Essentially it shouldn't shutdown until the others say they are ready, and it has no specific contribution to the determination of success or failure of the test.

In order to support all of these circumstances, EVdfg has some informational functions that let the application write shutdown code that doesn't explicitly depend upon how the stones are mapped to nodes. One of these functions is the EVclient_source_active() function that was introduced above. Another is EVclient_active_sink_count(), which returns the number of sinks that have been mapped to the current node. We'll use these, along with the EVdfg shutdown functions, EVclient_shutdown(), EVclient_ready_for_shutdown() and EVclient_wait_for_shutdown(), in order to cleanly handle shutdown in our example programs.

In particular, in the place of the call to CMrun_network() we'll use the following code segment in both master and client:

if (EVclient_active_sink_count(test_client) > 0) {
/* if there are active sinks, the handler will call EVclient_shutdown() */
} else {
if (EVclient_source_active(source_handle)) {
/* we had a source and have already submitted, indicate success */
EVclient_shutdown(test_client, 0 /* success */);
} else {
/* we had neither a source or sink, ready to shutdown, no opinion */
}
}
return(EVclient_wait_for_shutdown(test_client));

Additionally, we add the line:

    EVclient_shutdown((EVclient)client_data, event->integer_field == 318);

to the simpler_handler function. With these changes, all processes end up in EVclient_wait_for_shutdown() instead of CMrun_network(). Additionally:

That EVclient_wait_for_shutdown() is in the return statement means that each node will exit() with the same result code, which will be a success code if the event is delivered properly.

These examples, dfg_master3.c and dfg_client3.c are run like the three node example above.

Another example, dfg_master4.c is a "dynamic joining" version of dfg_master3. Run dfg_master4 with two instances of dfg_client3, but it doesn't matter what the first argument to dfg_client3 is. The first one to join will be given the canonical name "b" and the second one will be given "c". The second will have the sink stone and show the expected output.

Reconfiguration

EVdfg allows reconfiguration, changing the layout and contents of the DFG, after the initial realization; either upon node failure (as detected by a write file in the DFG), the joining of a new node, or through voluntary reconfiguration (via a CoD handler in the DFG calling EVdfg_trigger_reconfiguration()).

Specifying modifications to the deployed DFG are done with the same DFG creation functions detailed above, with some additions detailed below:

Function

Purpose

EVdfg_unlink_port() Remove the link on a specific output index
EVdfg_unlink_dest() Remove the link to a specific destination/target stone
EVdfg_destroy_stone() Remove a specific stone from the DFG

In addition, the functions EVdfg_set_attr_list() and EVdfg_get_attr_list() may be particularly useful in the reconfiguration process. In particular, when EVdfg_set_attr_list() is done prior to DFG realization, the attribute list that is supplied becomes the 'stone_attrs' value that is available for manipulation by actions running on the stone (See EVPath docs). Certain EVdfg-provided functions that are also available in actions cause these client-level stone attributes on individual clients to be flushed back to the master. Once flushed, those changed attributes are available for query via EVdfg_get_attr_list(). This basic communication mechanism allows for self-initiated DFG reconfiguration.

Future enhancements of EVdfg may allow the master to manage multiple independent DFGs simultaneously. In that circumstance, this API may change. Additional future enhancements to EVdfg may allow the application to query the state and connectivity of the DFG (I.E. to ask which nodes to link to a particular node, etc.). Without that enhancement in place, the master program must maintain it's own knowledge about what stones exist and how they are connected in order to change that connectivity in a reconfiguration handler.

Handling node failure

In order to handle node/process failure, the master must register a fail handler function with EVmaster_node_fail_handler():

typedef void (*EVmasterFailHandlerFunc) (EVdfg dfg, char *failed_client, int failed_stone);

Note that the master process is a single point of failure for the entire system. If the master process fails, the DFG is not recoverable and client behaviour is unpredictable. However, if a non-master client fails it can be handled cleanly and the DFG reconfigured, albeit with potential loss of events and data at the failed node. EVdfg detects node failure when event flow is distrupted by a failed network write. When this happens, the occurrence is reported to the master, the master determines which node and stone were the intended target of the network transmission and calls any failure handler with the DFG. The details of the fail handler vary widely by application, but the regression test evpath/dfg_tests/fail_chain_test.c may be used as an example. The important concepts are that the fail handler modify the prior DFG to a consistent state using the EVdfg_* APIs and then once again call EVdfg_realize().

Handling after-deployment node joins

In order to handle nodes joining after the initial deployment, the master must use "dynamic joining" as described above. I.E. it must register a node join handler function with EVmaster_node_join_handler(). In this case, the join handler will simply be called when new nodes join as it was prior to the initial realization. As with the fail handler, when called post-realization, the join handler should reconfigure the DFG as desired to accomodate the newly joined node and then should call EVdfg_realize().

Note that the join handler gets called with the EVmaster as a parameter while the other reconfiguration handlers are called with EVdfg as a first parameter. This arranement will change as EVdfg is modified to introduce multiple simultaneous DFGs, but exists at the moment because the join handler must also assign a canonical name for the new node. An example of handling late-joining nodes can be found in evpath/dfg_tests/fail_chain_test.c.

Handling self-initated DFG reconfiguration

The last type of reconfiguration that EVdfg is capable of is self-initiated reconfiguration. I.E. reconfiguration that is not initiated by a change in the set of underly nodes in the cohort, but rather is initiated by action code in the DFG itself, including possibly by actions running on client nodes.

Here, the reconfiguration handler is called if some action on stone calls 'EVdfg_trigger_reconfiguration()'. Because EVdfg has no actual knowledge of the circumstances which might have prompted this call, the handler on the master is invoked with only the EVdfg as a parameter. However, the call to EVdfg_trigger_reconfiguration() also causes all the stone attributes on that client to be flushed back to the master so that they are available for query and this communication mechanism can be used as a way for action code to convey those circumstances to the master. Again, details vary with the application but the regression test evpath/dfg_tests/self_reconfig_test.c provides an example.