Flatbuffers Message Abstractions¶
The message abstractions are a rather experimental way to more easily create test variations.
Note: This API is subject to change in the future, as some functionality should get decoupled in a more elegant way.
It is currently used to create variations for tests of gRPC_fb_getInstances.py
.
The tests can be found under
test_gRPC_fb_getInstances.py.
The idea is to provide default implementations for common datatypes, but still
allow for modification of parts of that datatype. In some flatbuffers
implementations there exists a API for mutability for that but unfortunately not
for python and even when it would, it would still impose other problems.
To tackle the problem, a wrapper is used where a set of enums, which corresponds
to a datatype's components, can be given in order to assemble a datatype with
those components "activated". Function pointers are used to define what should
happen if a component is inactive (often just None
is returned) and what
should happen if a component is active.
Datatype implementations for Query
and QueryInstance
and their abstraction
model can be found
here.
Defining new datatypes for variation testing¶
First the datatype has to be defined as a class inheriting from FrozenEnum provided through datastructures.py, which is essentially a unmodifiable enum.
This is done for the FbQuery
datatype, which corresponding flatbuffers
definition can be found
here.
class EnumFbQuery(FrozenEnum):
POLYGON = auto() # def: None
POLYGONSENSORPOSITION = auto() # def: None
FULLY_ENCAPSULATED = auto() # def: False
IN_MAP_FRAME = auto() # def: True
TIMEINTERVAL = auto() # def: None
LABEL = auto() # def: None
SPARQL_QUERY = auto() # def: None
ONTOLOGY_URI = auto() # def: None
MUST_HAVE_ALL_LABELS = auto() # def: False
PROJECTUUID = auto() # def: None
INSTANCEUUID = auto() # def: None
DATAUUID = auto() # def: None
WITHOUTDATA = auto() # def: False
MAX_NUM_DATA = auto() # def: None
Then FbQuery
inherits from MsgsFb
, which itself is a template type defined in
msgs_base.py.
class FbQuery(MsgsFb[Query.Query]):
def _set_enum_func_mapping(self) -> Dict[EnumFbQuery, MsgsFunctions]:
return {
EnumFbQuery.POLYGON: MsgsFunctions(
lambda: None, lambda: Dtypes.Fb.polygon2d(self.builder)
),
EnumFbQuery.POLYGONSENSORPOSITION: MsgsFunctions(
# TODO provide a example polygon, differing from POLYGON
lambda: None,
lambda: Dtypes.Fb.polygon2d(self.builder),
),
EnumFbQuery.FULLY_ENCAPSULATED: MsgsFunctions(
lambda: False, lambda: True
),
EnumFbQuery.IN_MAP_FRAME: MsgsFunctions(
lambda: True, lambda: False
),
EnumFbQuery.TIMEINTERVAL: MsgsFunctions(
lambda: None, lambda: Dtypes.Fb.time_interval(self.builder)
),
EnumFbQuery.LABEL: MsgsFunctions(
lambda: None, lambda: Dtypes.Fb.label_category(self.builder)
),
EnumFbQuery.SPARQL_QUERY: MsgsFunctions(
lambda: None, lambda: Dtypes.Fb.sparql_query(self.builder)
),
EnumFbQuery.ONTOLOGY_URI: MsgsFunctions(
lambda: None, lambda: Dtypes.Fb.ontology_uri(self.builder)
),
EnumFbQuery.MUST_HAVE_ALL_LABELS: MsgsFunctions(
lambda: False, lambda: True
),
EnumFbQuery.PROJECTUUID: MsgsFunctions(
lambda: None,
lambda: Dtypes.Fb.projectuuid(self.builder, self.channel),
),
EnumFbQuery.INSTANCEUUID: MsgsFunctions(
lambda: None, lambda: self.instanceuuid()
),
EnumFbQuery.DATAUUID: MsgsFunctions(
lambda: None,
lambda: Dtypes.Fb.datauuid(self.builder, self.channel),
),
EnumFbQuery.WITHOUTDATA: MsgsFunctions(lambda: False, lambda: True),
EnumFbQuery.MAX_NUM_DATA: MsgsFunctions(
lambda: None, lambda: Dtypes.Fb.max_num_data()
),
EnumFbQuery.SORT_BY_TIME: MsgsFunctions(
lambda: False, lambda: True
),
}
@expect_component(EnumFbQuery.PROJECTUUID)
def instanceuuid(self) -> List[int]:
return Dtypes.Fb.intanceuuid(
self.builder,
self.channel,
self.get_component(EnumFbQuery.PROJECTUUID),
)
@expect_component(EnumFbQuery.PROJECTUUID)
def datauuid(self) -> List[int]:
return Dtypes.Fb.datauuid(
self.builder, self.channel, self.get_component(EnumFbQuery.DATAUUID)
)
def _assemble_datatype_instance(self):
polygon = self.get_component(EnumFbQuery.POLYGON)
polygon_sensor_position = self.get_component(
EnumFbQuery.POLYGONSENSORPOSITION
)
fully_encapsulated = self.get_component(EnumFbQuery.FULLY_ENCAPSULATED)
in_map_frame = self.get_component(EnumFbQuery.IN_MAP_FRAME)
timeinterval = self.get_component(EnumFbQuery.TIMEINTERVAL)
label = self.get_component(EnumFbQuery.LABEL)
must_have_all_labels = self.get_component(
EnumFbQuery.MUST_HAVE_ALL_LABELS
)
projectuuid = self.get_component(EnumFbQuery.PROJECTUUID)
instanceuuid = self.get_component(EnumFbQuery.INSTANCEUUID)
datauuid = self.get_component(EnumFbQuery.DATAUUID)
withoutdata = self.get_component(EnumFbQuery.WITHOUTDATA)
sort_by_time = self.get_component(EnumFbQuery.SORT_BY_TIME)
return fbh.createQuery(
self.builder,
timeInterval=timeinterval,
labels=label,
mustHaveAllLabels=must_have_all_labels,
In MsgsFb
_set_enum_func_mapping()
is a abstractmethod which return type is
a dictionary, which maps the enum types to MsgsFunctions
. MsgsFunction
itself is just a structure to wrap two function pointers. The first function
pointer should be a pointer to the default_function
, which gets called, if the
component is not set to be active. The second function pointer is the one that
gets called when the component is set active.
On runtime it is checked if all elements of the enum are mapped. The functions are mostly mapped to default implementations for that specific component datatype. The default functions implementations can be inspected here at the bottom.
The @expect_component
decorator can be used to define dependencies between the
component datatypes, e.g. for the instanceuuid()
function to work, the
EnumFbQuery.PROJECTUUID
component must be set to active.
Lastly the abstractmethod _assemble_datatype_instance()
has to be implemented,
in the function all of the wrapper managed components should be retrieved and
the datatype should be built and returned. The base class makes sure that all
the components at this point are set.
Using message abstractions for testing¶
def test_gRPC_getInstanceTypes(grpc_channel, project_setup):
_, proj_uuid = project_setup
### check for instances on images
images_uuids, _, _ = send_imgs.send_labeled_images(proj_uuid, grpc_channel)
pcl_lst: List[PointCloud2.PointCloud2] = send_pcl.send_pointcloud(
proj_uuid, grpc_channel
)
img_uuid2point_map: Dict[str, PointStamped.PointStamped] = (
send_points.send_points(proj_uuid, grpc_channel)
)
# extract images ignore image uuids
images = [img[1] for img in images_uuids]
# retrieve the labelinstances of the bounding boxes
label_instances = []
for image in images:
for label_cat in image.labels:
for instance in label_cat.labels:
label_instances.append(instance.instanceUuid)
serv_man = ServiceManager(grpc_channel)
queryinst_builder = FbQueryInstance(
grpc_channel, enum_types={EnumFbQueryInstance.DATATYPE}
)
queryinst_builder.set_active_function(
EnumFbQueryInstance.DATATYPE, lambda: Datatype.Datatype.Image
)
queryinst_builder.assemble_datatype_instance()
instance_uuidspp = serv_man.call_get_instances_fb(
queryinst_builder.builder, queryinst_builder.datatype_instance
)
label_instances = sorted(label_instances)
instance_uuids = get_sorted_uuids_per_proj(instance_uuidspp)
assert instance_uuids == label_instances
In this function all the possible datatypes with attached instances are tested (this is just a snippet of that particular function).
The message abstractions are used by creating a object of FbQueryInstance
first
and setting the only active enum to EnumFbQueryInstance.DATATYPE
using the
enum_types
variable on initialization, as this is the only relevant component
to be modified for the test. In this case it would be relatively easy to test
different sets of components together e.g. testing for interference when specific
datatypes are set and while a polygon is used to restrict the relevant area.
After that set_active_function()
is used to set a different function pointer
to the active component's function pointer. This essentially sets the second
component of a MsgsFunction
type in the dictionary returned by
_set_enum_func_mapping()
, which is discussed in the previous section.
Lastly if a change to the dictionary has been done using set_active_function()
or set_mapped_functions()
, the assemble_datatype_instance()
has to be called
to reassemble the underlying datatype.
Note: On creation of the instance the datatype is assembled automatically in it's constructor.
Now the assembled datatype can be accessed using the property
queryinst_builder.datatype_instance
and the underlying flatbuffers builder can
be accessed by using queryinst_builder.builder
for further use, e.g. like in
this case calling the service function call_get_instances_fb()
using the
ServiceManager.
Note: The MsgsBase
class provides it's own ServiceManager
property for
building components, but that one shouldn't be used as it could change in the
future.
Another snippet to highlight is the following where one of the components of the
datatype itself is inheriting from MsgsFb
.
def test_gRPC_getInstanceQueryTimeinterval(grpc_channel, project_setup):
_, proj_uuid = project_setup
serv_man = ServiceManager(grpc_channel)
# only send pictures to ease the testing process
images_uuids, _, _ = send_imgs.send_labeled_images(proj_uuid, grpc_channel)
# extract images ignore image uuids
images = [img[1] for img in images_uuids]
### time interval tests
time_offset = 1000
cur_time = int(time.time())
min_time_ = cur_time - time_offset
max_time_ = cur_time + time_offset
query_builder = FbQuery(grpc_channel, enum_types={EnumFbQuery.TIMEINTERVAL})
queryinst_builder = FbQueryInstance(
grpc_channel, enum_types={EnumFbQueryInstance.QUERY}
)
# test for time interval
min_timestamp = fbh.createTimeStamp(queryinst_builder.builder, min_time_, 0)
max_timestamp = fbh.createTimeStamp(queryinst_builder.builder, max_time_, 0)
img_time_interval = fbh.createTimeInterval(
queryinst_builder.builder, min_timestamp, max_timestamp
)
query_builder.set_active_function(
EnumFbQuery.TIMEINTERVAL, lambda: img_time_interval
)
queryinst_builder.set_active_function(
EnumFbQueryInstance.QUERY, lambda: query_builder.datatype_instance
)
query_builder.assemble_datatype_instance()
queryinst_builder.assemble_datatype_instance()
instance_uuids_intimeinterval = get_sorted_uuids_per_proj(
serv_man.call_get_instances_fb(
queryinst_builder.builder, queryinst_builder.datatype_instance
)
)
min_time_ = cur_time - 2 * time_offset
max_time_ = cur_time - time_offset
min_timestamp = fbh.createTimeStamp(queryinst_builder.builder, min_time_, 0)
max_timestamp = fbh.createTimeStamp(queryinst_builder.builder, max_time_, 0)
img_time_interval = fbh.createTimeInterval(
queryinst_builder.builder, min_timestamp, max_timestamp
)
query_builder.set_active_function(
EnumFbQuery.TIMEINTERVAL, lambda: img_time_interval
)
query_builder.assemble_datatype_instance()
queryinst_builder.assemble_datatype_instance()
instance_uuids_outtimeinterval = get_sorted_uuids_per_proj(
serv_man.call_get_instances_fb(
queryinst_builder.builder, queryinst_builder.datatype_instance
)
)
label_instances = get_instances_from_imgs(images)
assert len(instance_uuids_outtimeinterval) == 0
assert sorted(label_instances) == sorted(instance_uuids_intimeinterval)
Here query_builder
is used to build the Query.Query
datatype in order to
supply that one to the queryinst_builder
query component. It is important that
query_builder.assemble_datatype_instance()
is called before
queryinst_builder.assemble_datatype_instance()
, otherwise the changes by setting
the active function on query_builder
are not reflected in the
queryinst_builder.datatype_instance
.
Inner workings of the MsgsFb
and MsgsBase
classes¶
Meaning of Symbols and Notations in this diagram:
- Rectangular boxes: instance methods
- Ellipsis: instance variables
- Line arrows (with text): calls to methods or setting variables (with the help of those variables specified by the text)
- Line arrows with numbers: show the order in which things are done
- dotted arrows outgoing: Getter methods/properties
- dotted arrows incoming: Setter methods/properties
- red colored text: already used functionality in the examples above
- purple colored text:
@abstractmethod
also used above - blue colored text: has a special meaning, is not mirrored exactly by the implementation
Note: Some details are not shown like the validation methods for the given enum type. But they are not neccessary to understand the structure.
First in the initialization phase variables (_builder
, _service_manager
,
_channel
, _active_enums
) are set and managed by the MsgsFb
instance, those
are needed for the assembly of the datatype instance later. _builder
is a
simply a flatbuffers builder. channel
is used to create ServiceManager
instance and manage a grpc_channel
type variable. At last _active_enums
is a
set of enum elements, which will be used to specify which component of the datatype
is "active" (i.e. which component is set by the active_function
of the
MsgsFunctions
class).
After that the _enum_func_mapping
variable is set by the
_set_enum_func_mapping()
function. This variable can also get manipulated by
set_active_function()
or set_mapped_functions()
. Then
_assemble_components()
is called, which makes sure that the _components
are
set, i.e. the functions in the _enum_func_mapping
are called and the components
are set by those default_functions
or those active_functions
,
if their corresponding enum is in the _active_enums
set.
Note: _components
are implemented as multiple dynamically at runtime
created instance variables with the name of the component specified by the enum
element name.
Finally the on the instance callable assemble_datatype_instance()
method
triggers a rebuild of the _assembled_datatype_instance
, by first refreshing
the components and then calling the _assemble_datatype_instance()
method.
The assemble_datatype_instance()
method gets also called in the __init__()
method, such that the message abstractions always try to guarantee a set
datatype_instance
variable.