I've been building parameterized pipelines in ADF v2 for about eight months now. It's time to do a proper deep dive — how parameterization works, the expression language, the patterns that hold up in production, and the gotchas I've run into.
If you've been following along, you know parameterization is the feature I've been asking for since 2014. Now that it exists, let's use it properly.
Defining Parameters
Parameters are defined at the pipeline level and optionally at the dataset level. A pipeline parameter has a name, a type (String, Int, Float, Bool, Array, Object), and an optional default value.
{
"name": "IncrementalIngestPipeline",
"parameters": {
"SourceTable": { "type": "String" },
"TargetTable": { "type": "String" },
"WatermarkColumn": { "type": "String" },
"LastWatermark": { "type": "String", "defaultValue": "1900-01-01" },
"BatchSize": { "type": "Int", "defaultValue": 100000 }
}
}
Dataset parameters work similarly. A parameterized dataset can have its connection, schema, or path driven by values passed at runtime:
{
"name": "ParameterizedSQLSource",
"parameters": {
"TableName": { "type": "String" },
"SchemaName": { "type": "String", "defaultValue": "dbo" }
},
"typeProperties": {
"tableName": {
"value": "@concat(dataset().SchemaName, '.', dataset().TableName)",
"type": "Expression"
}
}
}
The Expression Language
Expressions in ADF v2 are evaluated at runtime. Any field that supports expressions can be set to a static value or an expression — the field type in the JSON is either a plain string/number or an object with "value" and "type": "Expression".
The functions available:
String operations:
@concat(pipeline().parameters.SourceTable, '_', formatDateTime(utcnow(), 'yyyyMMdd'))
@substring(pipeline().parameters.TableName, 0, 10)
@toLower(pipeline().parameters.SchemaName)
@replace(pipeline().parameters.FilePath, '/', '\')
Date and time:
@formatDateTime(pipeline().parameters.StartDate, 'yyyy-MM-dd')
@addDays(trigger().scheduledTime, -1)
@addHours(trigger().outputs.windowStartTime, 1)
@startOfDay(utcnow())
Conditional:
@if(equals(pipeline().parameters.LoadType, 'Full'),
'TRUNCATE TABLE TargetTable',
concat('DELETE FROM TargetTable WHERE ModifiedDate > ''', pipeline().parameters.LastWatermark, ''''))
Activity output references:
@activity('LookupWatermark').output.firstRow.WatermarkValue
@activity('CopyData').output.rowsCopied
@activity('CopyData').output.dataWritten
A Complete Generic Ingest Pipeline
Here's the pattern I've settled on for incremental SQL ingest. The pipeline has five activities in sequence:
1. LookupCurrentWatermark: reads the current high-watermark for this table from the control table.
{
"name": "LookupCurrentWatermark",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "@concat('SELECT WatermarkValue FROM dbo.ControlTable WHERE TableName = ''', pipeline().parameters.SourceTable, '''')",
"type": "Expression"
}
}
}
}
2. CopyIncrementalData: copies rows newer than the watermark.
{
"name": "CopyIncrementalData",
"type": "Copy",
"dependsOn": [{ "activity": "LookupCurrentWatermark", "dependencyConditions": ["Succeeded"] }],
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": {
"value": "@concat('SELECT * FROM dbo.[', pipeline().parameters.SourceTable, '] WHERE [', pipeline().parameters.WatermarkColumn, '] > ''', activity('LookupCurrentWatermark').output.firstRow.WatermarkValue, '''')",
"type": "Expression"
}
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000,
"sqlWriterStoredProcedureName": {
"value": "@concat('usp_Upsert_', pipeline().parameters.TargetTable)",
"type": "Expression"
}
}
}
}
3. UpdateWatermark: writes the new high-watermark back to the control table.
{
"name": "UpdateWatermark",
"type": "SqlServerStoredProcedure",
"dependsOn": [{ "activity": "CopyIncrementalData", "dependencyConditions": ["Succeeded"] }],
"typeProperties": {
"storedProcedureName": "usp_UpdateControlTable",
"storedProcedureParameters": {
"TableName": { "value": "@pipeline().parameters.SourceTable", "type": "Expression" },
"NewWatermark": { "value": "@activity('CopyIncrementalData').output.dataWritten", "type": "Expression" }
}
}
}
The Gotchas
Parameters must be set at trigger time. You cannot compute a parameter value inside a pipeline from another activity's output and then use it as a parameter for a child pipeline (via Execute Pipeline activity). Parameters flow in, not from activity to activity across pipeline boundaries. If you need an activity output to drive child pipeline behavior, pass it through the Execute Pipeline parameters using an expression — but the expression is evaluated in the parent pipeline's context, not re-evaluated in the child.
Expression syntax errors fail silently until runtime. A malformed expression doesn't fail at deployment — it fails when the activity runs. Test your expressions with simple values before deploying to production, and test the edge cases (null watermark, empty table name).
String concatenation with quotes requires escaping. Building SQL query strings via concat with single quotes in the query requires doubling the quotes: ''' becomes '' in the SQL, which is a single quote. It's readable once you understand it, ugly until you do.
The ForEach gap. Parameters exist. The looping construct to drive those parameters from a config table doesn't ship until later this year. Until ForEach is available, you can't build a fully metadata-driven framework purely within ADF. I'm still bridging with a stored procedure that queues work and an external trigger.
Is It Worth Using Now?
Yes. Even without ForEach, parameterized pipelines are dramatically better than the per-table JSON files of v1. If you're building new workloads in ADF v2, use parameterization from day one. If you're running v1, start building generic parameterized versions of your most common pipeline patterns now — you'll migrate them cleanly when the rest of the framework comes together.
When ForEach ships, I'll write the complete metadata-driven framework walkthrough. Until then, this pattern gives you the building block. I'm here to help.