注:0.5.0版本
Apache Griffin measure module needs two configuration files to define the parameters of execution, one is for environment, the other is for dq job.
Environment Parameters
{
"spark": {
"log.level": "WARN",
"checkpoint.dir": "hdfs:///griffin/streaming/cp",
"batch.interval": "1m",
"process.interval": "5m",
"config": {
"spark.default.parallelism": 5,
"spark.task.maxFailures": 5,
"spark.streaming.kafkaMaxRatePerPartition": 1000,
"spark.streaming.concurrentJobs": 4,
"spark.yarn.maxAppAttempts": 5,
"spark.yarn.am.attemptFailuresValidityInterval": "1h",
"spark.yarn.max.executor.failures": 120,
"spark.yarn.executor.failuresValidityInterval": "1h",
"spark.hadoop.fs.hdfs.impl.disable.cache": true
}
},
"sinks": [
{
"type": "console",
"config": {
"max.log.lines": 100
}
}, {
"type": "hdfs",
"config": {
"path": "hdfs:///griffin/streaming/persist",
"max.lines.per.file": 10000
}
}
],
"griffin.checkpoint": [
{
"type": "zk",
"config": {
"hosts": "<zookeeper host ip>:2181",
"namespace": "griffin/infocache",
"lock.path": "lock",
"mode": "persist",
"init.clear": true,
"close.clear": false
}
}
]
}
Above lists environment parameters.
-
spark: This field configures spark and spark streaming parameters.
- log.level: Level of spark log.
- checkpoint.dir: Check point directory of spark streaming, for streaming mode.
- batch.interval: Interval of dumping streaming data, for streaming mode.
- process.interval: Interval of processing dumped streaming data, for streaming mode.
- config: Configuration of spark parameters.
-
sinks: This field configures list of metrics sink parameters, multiple sink ways are supported. Details of sink configuration here.
-
griffin.checkpoint: This field configures list of griffin checkpoint parameters, multiple cache ways are supported. It is only for streaming dq case. Details of info cache configuration here.
Sinks
-
type: Metrics and records sink type, "console", "hdfs", "http", "mongo".
-
config: Configure parameters of each sink type.
- console sink (aliases: "log")
- max.log.lines: the max lines of log.
- hdfs sink
- path: hdfs path to sink metrics
- max.persist.lines: the max lines of total sink data.
- max.lines.per.file: the max lines of each sink file.
- http sink (aliases: "es", "elasticsearch")
- api: api to submit sink metrics.
- method: http method, "post" default.
- mongo sink
- url: url of mongo db.
- database: database name.
- collection: collection name.
- console sink (aliases: "log")
Griffin Checkpoint
-
type: Griffin checkpoint type, "zk" for zookeeper checkpoint.
-
config: Configure parameters of griffin checkpoint type.
- zookeeper checkpoint
- hosts: zookeeper hosts list as a string, separated by comma.
- namespace: namespace of cache info, "" as default.
- lock.path: path of lock info, "lock" as default.
- mode: create mode of zookeeper node, "persist" as default.
- init.clear: clear cache info when initialize, true default.
- close.clear: clear cache info when close connection, false default.
- zookeeper checkpoint
DQ Job Parameters
{
"name": "accu_batch",
"process.type": "BATCH",
"data.sources": [
{
"name": "src",
"connectors": [
{
"type": "AVRO",
"version": "1.7",
"config": {
"file.path": "<path>/<to>",
"file.name": "<source-file>.avro"
}
}
]
}, {
"name": "tgt",
"connectors": [
{
"type": "AVRO",
"version": "1.7",
"config": {
"file.path": "<path>/<to>",
"file.name": "<target-file>.avro"
}
}
]
}
],
"evaluate.rule": {
"rules": [
{
"dsl.type": "griffin-dsl",
"dq.type": "ACCURACY",
"out.dataframe.name": "accu",
"rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
"details": {
"source": "source",
"target": "target",
"miss": "miss_count",
"total": "total_count",
"matched": "matched_count"
},
"out": [
{
"type": "metric",
"name": "accu"
},
{
"type": "record"
}
]
}
]
},
"sinks": ["CONSOLE", "HTTP", "HDFS"]
}
Above lists DQ job configure parameters.
-
name: Name of DQ job.
-
process.type: Process type of DQ job, "BATCH" or "STREAMING".
-
data.sources: List of data sources in this DQ job.
- name: Name of this data source, it should be different from other data sources.
- connectors: List of data connectors combined as the same data source. Details of data connector configuration here.
-
evaluate.rule: Evaluate rule parameters of this DQ job.
- dsl.type: Default dsl type of all the rules.
- rules: List of rules, to define every rule step. Details of rule configuration here.
-
sinks: Whitelisted sink types for this job. Note: no sinks will be used, if empty or omitted.
Data Connector
-
type: Data connector type: "AVRO", "HIVE", "TEXT-DIR", "CUSTOM" for batch mode; "KAFKA", "CUSTOM" for streaming mode.
-
version: Version string of data connector type.
-
config: Configure parameters of each data connector type.
- avro data connector
- file.path: avro file path, optional, "" as default.
- file.name: avro file name.
- hive data connector
- database: data base name, optional, "default" as default.
- table.name: table name.
- where: where conditions string, split by ",", optional.
e.g.dt=20170410 AND hour=15, dt=20170411 AND hour=15, dt=20170412 AND hour=15
- text dir data connector
- dir.path: parent directory path.
- data.dir.depth: integer, depth of data directories, 0 as default.
- success.file: success file name,
- done.file:
- custom connector
- class: class name for user-provided data connector implementation. For Batch
it should be implementing BatchDataConnector trait and have static method with signaturedef apply(ctx: BatchDataConnectorContext): BatchDataConnector.
For Streaming, it should be implementing StreamingDataConnector and have static methoddef apply(ctx: StreamingDataConnectorContext): StreamingDataConnector. User-provided
data connector should be present in Spark job's class path, by providing custom jar as -jar parameter
to spark-submit or by adding to "jars" list in sparkProperties.json.
- class: class name for user-provided data connector implementation. For Batch
- avro data connector
Rule
-
dsl.type: Rule dsl type, "spark-sql", "df-ops" and "griffin-dsl".
-
dq.type: DQ type of this rule, only for "griffin-dsl" type. Supported types: "ACCURACY", "PROFILING", "TIMELINESS", "UNIQUENESS", "COMPLETENESS".
-
out.dataframe.name (step information): Output table name of this rule, could be used in the following rules.
-
in.dataframe.name (step information): Input table name of this rule, only used for "df-ops" type.
-
rule: The rule string.
-
details: Details of this rule, optional.
- accuracy dq type detail configuration
- source: the data source name which as source in accuracy, default is the name of first data source in "data.sources" if not configured.
- target: the data source name which as target in accuracy, default is the name of second data source in "data.sources" if not configured.
- miss: the miss count name in metric, optional.
- total: the total count name in metric, optional.
- matched: the matched count name in metric, optional.
- profiling dq type detail configuration
- source: the data source name which as source in profiling, default is the name of first data source in "data.sources" if not configured. If the griffin-dsl rule contains from clause, this parameter is ignored.
- distinctness dq type detail configuration
- source: name of data source to measure uniqueness.
- target: name of data source to compare with. It is always the same as source, or more than source.
- distinct: the unique count name in metric, optional.
- total: the total count name in metric, optional.
- dup: the duplicate count name in metric, optional.
- accu_dup: the accumulate duplicate count name in metric, optional, only in streaming mode and "with.accumulate" enabled.
- num: the duplicate number name in metric, optional.
- duplication.array: optional, if set as a non-empty string, the duplication metric will be computed, and the group metric name is this string.
- with.accumulate: optional, default is true, if set as false, in streaming mode, the data set will not compare with old data to check distinctness.
- uniqueness dq type detail configuration
- source: name of data source to measure uniqueness.
- target: name of data source to compare with. It is always the same as source, or more than source.
- unique: the unique count name in metric, optional.
- total: the total count name in metric, optional.
- dup: the duplicate count name in metric, optional.
- num: the duplicate number name in metric, optional.
- duplication.array: optional, if set as a non-empty string, the duplication metric will be computed, and the group metric name is this string.
- completeness dq type detail configuration
- source: name of data source to measure completeness.
- total: name of data source to compare with. It is always the same as source, or more than source.
- complete: the column name in metric, optional. The number of not null values.
- incomplete: the column name in metric, optional. The number of null values.
- timeliness dq type detail configuration
- source: name of data source to measure timeliness.
- latency: the latency column name in metric, optional.
- total: column name, optional.
- avg: column name, optional. The average latency.
- step: column nmae, optional. The histogram where "bin" is step=floor(latency/step.size).
- count: column name, optional. The number of the same latencies in the concrete step.
- percentile: column name, optional.
- threshold: optional, if set as a time string like "1h", the items with latency more than 1 hour will be record.
- step.size: optional, used to build the histogram of latencies, in milliseconds (ex. "100").
- percentile.values: optional, used to compute the percentile metrics, values between 0 and 1. For instance, We can see fastest and slowest latencies if set [0.1, 0.9].
- accuracy dq type detail configuration
-
cache: Cache output dataframe. Optional, valid only for "spark-sql" and "df-ops" mode. Defaults to
falseif not specified. -
out: List of output sinks for the job.
- Metric output.
- type: "metric"
- name: Metric name, semantics depends on "flatten" field value.
- flatten: Aggregation method used before sending data frame result into the sink:
- default: use "array" if data frame returned multiple records, otherwise use "entries"
- entries: sends first row of data frame as metric results, like like
{"agg_col": "value"} - array: wraps all metrics into a map, like
{"my_out_name": [{"agg_col": "value"}]} - map: wraps first row of data frame into a map, like
{"my_out_name": {"agg_col": "value"}}
- Record output. Currently handled only by HDFS sink.
- type: "record"
- name: File name within sink output folder to dump files to.
- Data source cache update for streaming jobs.
- type: "dsc-update"
- name: Data source name to update cache.
- Metric output.