Example 2: Pipe flow with uniform source
Problem Setup¶
We consider a two-dimensional pipe geometry with a prescribed half-Poiseuille velocity field aligned with the -axis:
The profile vanishes at the top wall () and attains its maximum on the bottom boundary (), which is treated as a symmetry plane. A uniform volumetric source ms is applied throughout the domain, representing steady production of the transported species.
The governing equation is:
Boundary Conditions¶
The rectangular domain has four disjoint boundaries, each carrying conditions for both the advection and diffusion operators:
| Boundary | Location | Advection | Diffusion | |
|---|---|---|---|---|
| Inlet | Inflow, prescribed weakly | Dirichlet, (SIPG) | ||
| Outlet | Outflow, interior trace used | Homogeneous Neumann | ||
| Wall | 0 | No advective flux | Homogeneous Neumann | |
| Symmetry | 0 | No advective flux | Homogeneous Neumann |
Because the inflow and outflow boundaries are known a priori, the indicator is only required on interior faces. The wall and symmetry boundaries satisfy exactly by construction, so their advective face integrals vanish and no explicit term is added.
Source
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
matplotlib.rcParams['svg.fonttype'] = 'none'
fig, ax = plt.subplots(figsize=(7.0, 3.5))
ax.add_patch(patches.Rectangle((0, 0), 10, 2, facecolor='white', edgecolor='black', linewidth=1.5, zorder=1))
ax.text(5.0, 1.0, r'$\Omega$', ha='center', va='center', fontsize=18)
ax.text(-0.5, 1.0, r"$\Gamma_{\mathrm{inflow}}$", ha='right', va='center', fontsize=14)
ax.text(5.0, 2.3, r"$\Gamma_{\mathrm{wall}}$", ha='center', va='center', fontsize=14)
ax.text(10.5, 1.0, r'$\Gamma_{\mathrm{outflow}}$', ha='left', va='center', fontsize=14)
ax.text(5.0, -0.3, r'$\Gamma_{\mathrm{symmetry}}$', ha='center', va='center', fontsize=14)
n_arrows = 7
ys = np.linspace(0, 1.8, n_arrows)
for yi in ys:
length = 3 * (1 - (yi/2))**2
ax.annotate('', xy=(0.7+length, yi+0.1), xytext=(0.3, yi+0.1),
arrowprops=dict(arrowstyle='->', color='black', lw=2), zorder=2)
ax.text(3.1, 0.25, r'$\mathbf{w}$', fontsize=16, ha='left')
ax.set_xlim(-0.5, 10.5)
ax.set_ylim(-0.5, 2.5)
ax.axis('off')
plt.tight_layout()
fig.savefig('../images/domain_mwe2.svg', bbox_inches='tight')
plt.close(fig)Problem formulation¶
The weak form is the same as in Example 1. What changes is how the boundary conditions are applied and how the velocity is stored. These points are covered alongside the relevant code below.
from mpi4py import MPI
from petsc4py import PETSc
import numpy as np
import ufl
import dolfinx
import basix.ufl
from dolfinx import fem, mesh
from dolfinx.fem.petsc import NonlinearProblem
inlet_x = 0.0
outlet_x = 5.0
height = 1.0
msh = mesh.create_rectangle(
MPI.COMM_WORLD,
points=((inlet_x, 0.0), (outlet_x, height)),
n=(50, 10),
cell_type=mesh.CellType.triangle,
diagonal=mesh.DiagonalType.crossed,
)
V = fem.functionspace(msh, ("DG", 1))
u = fem.Function(V)
v = ufl.TestFunction(V)
n = ufl.FacetNormal(msh)
h = ufl.CellDiameter(msh)We take ms and ms, giving a Péclet number based on the pipe length m. This means advection is the dominant transport mechanism, with diffusion playing only a minor role.
Unlike Example 1, the velocity here varies in space rather than being constant. We store it in a continuous Lagrange space (CG), which means takes the same value on both sides of every interior face. Because does not jump across faces, we can reuse exactly the same upwind flux as in Example 1 without having to add any extra terms to handle a discontinuous velocity.
Source
D_value = 0.1
w_max = 10
D = fem.Constant(msh, PETSc.ScalarType(D_value))
# Velocity: half-Poiseuille profile in a continuous CG2 space
vel_el = basix.ufl.element(
"Lagrange", msh.topology.cell_name(), 2, shape=(msh.geometry.dim,)
)
V_vel = fem.functionspace(msh, vel_el)
w = fem.Function(V_vel, name="velocity")
def velocity_func(x):
values = np.zeros((2, x.shape[1]))
values[0] = w_max * (1 - x[1] ** 2)
return values
w.interpolate(velocity_func)
# Render the velocity magnitude with pyvista
import pyvista
from dolfinx import plot
topology, cell_types, geometry = plot.vtk_mesh(V_vel)
w_grid = pyvista.UnstructuredGrid(topology, cell_types, geometry)
w_values = w.x.array.reshape(-1, msh.geometry.dim)
# Pad 2D velocity to 3D for glyph orientation
w_3d = np.zeros((w_values.shape[0], 3))
w_3d[:, :2] = w_values
w_grid["w"] = w_3d
w_grid["w_mag"] = np.linalg.norm(w_values, axis=1)
glyphs = w_grid.glyph(orient="w", scale="w_mag", factor=0.04, tolerance=0.03)
plotter = pyvista.Plotter(off_screen=True, window_size=(1600, 500))
field_actor = plotter.add_mesh(
w_grid, scalars="w_mag", cmap="viridis", show_scalar_bar=False
)
plotter.add_mesh(glyphs, color="white", show_scalar_bar=False)
plotter.add_scalar_bar(
title="|w| (m s^-1)",
mapper=field_actor.mapper,
vertical=False,
width=0.5,
height=0.08,
position_x=0.25,
position_y=0.05,
)
plotter.view_xy()
plotter.camera.tight(padding=0.05, adjust_render_window=False)
plotter.save_graphic("../images/velocity_field_mwe2.svg")
plotter.close()We tag each of the four boundaries with a different integer marker, so that we can apply the correct boundary condition to each one using ds(tag). In Example 1 we used the indicator to pick out the inflow and outflow regions of the boundary on the fly. Here, the inlet and outlet locations are fixed by the geometry, so there is no need to work them out with ; we can just tag them once and apply the corresponding condition. We still need on the interior faces, because the upwind side there depends on the local velocity direction and changes from face to face.
The wall () and symmetry () boundaries have by construction of the velocity field, so no mass is carried across them by advection and nothing needs to be added. For diffusion, we want zero normal flux on these boundaries (homogeneous Neumann), which is the default in a DG formulation, so again nothing needs to be added.
# Mark boundary facets
tdim = msh.topology.dim
fdim = tdim - 1
msh.topology.create_entities(fdim)
inlet_id = 1 # x = 0
outlet_id = 2 # x = outlet_x
wall_id = 3 # y = 1
symmetry_id = 4 # y = 0
inlet_facets = mesh.locate_entities_boundary(msh, fdim, lambda x: np.isclose(x[0], inlet_x))
outlet_facets = mesh.locate_entities_boundary(msh, fdim, lambda x: np.isclose(x[0], outlet_x))
wall_facets = mesh.locate_entities_boundary(msh, fdim, lambda x: np.isclose(x[1], height))
symmetry_facets = mesh.locate_entities_boundary(msh, fdim, lambda x: np.isclose(x[1], 0.0))
facet_imap = msh.topology.index_map(fdim)
num_facets = facet_imap.size_local + facet_imap.num_ghosts
markers = np.zeros(num_facets, dtype=np.intc)
markers[inlet_facets] = inlet_id
markers[outlet_facets] = outlet_id
markers[wall_facets] = wall_id
markers[symmetry_facets] = symmetry_id
indices = np.arange(num_facets, dtype=np.intc)
mt = mesh.meshtags(msh, fdim, indices, markers)
ds = ufl.Measure("ds", domain=msh, subdomain_data=mt)
dS = ufl.dS
dx = ufl.dxThe bulk and interior-face terms are the same as in Example 1: the advection bulk integral, the upwind flux on interior faces, the diffusion bulk integral, and the SIPG terms on interior faces. Only the boundary terms differ.
On the inlet, we add two terms. The advection term weakly imposes the inflow value . Since on the inlet, only the inflow branch of the upwind flux is active here. The SIPG (Nitsche) term imposes the Dirichlet condition for diffusion. It has three pieces: a consistency term, a symmetry-restoring term that keeps the overall form symmetric in and , and a penalty of order that pulls the solution towards at the boundary.
On the outlet, the upwind flux uses the value of from inside the domain, giving . Diffusion is homogeneous Neumann, so no term is added.
Because we are using here, the pieces of these terms that would sit on the right-hand side (i.e. act as a source) vanish. Only the parts involving remain in .
# Upwind indicator on interior faces
lmbda = ufl.conditional(ufl.gt(ufl.dot(w, n), 0), 1, 0)
penalty = fem.Constant(msh, PETSc.ScalarType(10))
u_inlet = fem.Constant(msh, PETSc.ScalarType(0.0))
f_source = fem.Constant(msh, PETSc.ScalarType(1.0))
F = 0
# Advection: bulk + interior upwind flux
F += -ufl.inner(w * u, ufl.grad(v)) * dx
F += ufl.inner(2 * ufl.avg(lmbda * w * u), ufl.jump(v, n)) * dS
# Advection BCs: inflow (prescribed) at inlet, free outflow at outlet
F_inlet_adv = -ufl.inner((1 - lmbda) * ufl.dot(w, n) * u_inlet, v) * ds(inlet_id)
F_outlet_surf = ufl.inner(lmbda * ufl.dot(w, n) * u, v) * ds(outlet_id)
F += F_inlet_adv + F_outlet_surf
# Diffusion: bulk + interior SIPG
F += D * ufl.inner(ufl.grad(u), ufl.grad(v)) * dx
F += -D * ufl.inner(ufl.avg(ufl.grad(u)), ufl.jump(v, n)) * dS
F += -D * ufl.inner(ufl.jump(u, n), ufl.avg(ufl.grad(v))) * dS
F += D * (penalty / ufl.avg(h)) * ufl.inner(ufl.jump(u, n), ufl.jump(v, n)) * dS
# Diffusion BC: weak Dirichlet (SIPG Nitsche) at inlet only
F_inlet_nitsche = D * (
-ufl.inner(ufl.grad(u), v * n) * ds(inlet_id)
- ufl.inner(ufl.grad(v), (u - u_inlet) * n) * ds(inlet_id)
+ (penalty / h) * ufl.inner(u - u_inlet, v) * ds(inlet_id)
)
F += F_inlet_nitsche
F_inlet_terms = F_inlet_adv + F_inlet_nitsche
# Source
F += -ufl.inner(f_source, v) * dxThe residual is linear in , so even though we use NonlinearProblem, the Newton solver converges in a single step. The solver settings (a direct LU solver from PETSc) are the same as in Example 1 and handle a problem of this size easily.
problem = NonlinearProblem(
F, u, J=ufl.derivative(F, u),
petsc_options_prefix="advecdiff",
petsc_options={
"snes_type": "newtonls",
"snes_linesearch_type": "none",
"snes_stol": np.sqrt(np.finfo(dolfinx.default_real_type).eps) * 1e-2,
"snes_atol": 1e-10,
"snes_rtol": 1e-10,
"snes_max_it": 30,
"snes_divergence_tolerance": "PETSC_UNLIMITED",
"ksp_type": "preonly",
"pc_type": "lu",
"pc_factor_mat_solver_type": "petsc",
},
)
u = problem.solve()
u.x.scatter_forward()
We check the solution by looking at the global mass balance. At steady state, everything produced by the source has to leave through one of the four boundaries, so adding up the four boundary fluxes should give back the total source.
We compute each flux using the consistent flux technique. Once the solver has converged, the residual is zero at every degree of freedom (DOF). If we drop a boundary’s explicit surface terms from , assemble it, and sum the result over the DOFs next to that boundary, what remains is exactly the flux through that boundary (with a sign flip). The advantage of this approach over computing directly from the solution is that the consistent flux balances the source to machine precision, whereas a direct integral is only as accurate as the underlying numerical solution.
# --- Consistent flux verification ---
msh.topology.create_connectivity(fdim, tdim)
msh.topology.create_connectivity(tdim, tdim)
owned_size = V.dofmap.index_map.size_local * V.dofmap.index_map_bs
def get_owned_dofs(marker):
facets = mt.find(marker)
f_to_c = msh.topology.connectivity(fdim, tdim)
cells = np.unique(np.concatenate([f_to_c.links(f) for f in facets]))
dofs = fem.locate_dofs_topological(V, tdim, cells)
return dofs[dofs < owned_size]
inlet_dofs = get_owned_dofs(inlet_id)
outlet_dofs = get_owned_dofs(outlet_id)
wall_dofs = get_owned_dofs(wall_id)
symmetry_dofs = get_owned_dofs(symmetry_id)
def compute_consistent_flux(residual_form: fem.Form, dofs: np.ndarray):
residual = fem.assemble_vector(residual_form)
residual.scatter_reverse(dolfinx.la.InsertMode.add)
residual.scatter_forward()
local_flux = np.sum(residual.array[dofs])
return msh.comm.allreduce(local_flux, op=MPI.SUM)
# F(u_h) = 0 at the solution. Subtract each boundary's explicit surface terms
# from F, then summing the residual at boundary-adjacent DOFs isolates the
# negative of that boundary's flux contribution.
F_no_outlet = fem.form(F - F_outlet_surf)
F_no_inlet = fem.form(F - F_inlet_terms)
F_form = fem.form(F)
flux_outlet = -compute_consistent_flux(F_no_outlet, outlet_dofs)
flux_inlet = -compute_consistent_flux(F_no_inlet, inlet_dofs)
flux_wall = compute_consistent_flux(F_form, wall_dofs)
flux_symmetry = compute_consistent_flux(F_form, symmetry_dofs)
source_total = msh.comm.allreduce(
fem.assemble_scalar(fem.form(f_source * dx)), op=MPI.SUM
)
consist_total = flux_outlet + flux_wall + flux_symmetry + flux_inlet
def pct(val):
return 100 * val / source_total
print(f"Source integral : {source_total:.6e}")
print()
print(f"{'Flux outlet':20s} {flux_outlet:14.6e}")
print(f"{'Flux inlet':20s} {flux_inlet:14.6e}")
print(f"{'Flux wall':20s} {flux_wall:14.6e}")
print(f"{'Flux symmetry':20s} {flux_symmetry:14.6e}")
print()
c_bal = source_total - consist_total
print(f"{'Total flux':20s} {consist_total:14.6e}")
print(f"{'Balance residual':20s} {c_bal:14.6e} ({pct(c_bal):+.2f}%)")Source integral : 5.000000e+00
Flux outlet 4.976947e+00
Flux inlet 2.305309e-02
Flux wall -2.689387e-15
Flux symmetry 7.004780e-16
Total flux 5.000000e+00
Balance residual 1.074696e-13 (+0.00%)
Interpretation¶
With the problem is strongly advection-dominated. Almost all of the produced mass leaves through the outlet. The inlet carries a small flux, caused by diffusion pushing a little mass back against the imposed condition. The wall and symmetry boundaries carry no flux to machine precision: advection is zero there because , and we did not add any diffusive surface term, so the consistent-flux calculation simply returns zero up to roundoff.
The four fluxes add up to the source integral to floating-point roundoff, which confirms that the DG discretisation conserves mass exactly at the discrete level.
Péclet sweep¶
Fixing the diffusion coefficient ms and the domain length m, we vary over six orders of magnitude to sweep the Péclet number from 10-3 (diffusion-dominated) to 103 (advection-dominated).
At low Pe, diffusion spreads the uniform source roughly symmetrically and mass leaves through both the inlet and outlet in comparable amounts. As Pe grows, advection sweeps the mass downstream and the outlet carries an increasing share of the flux, while the inlet contribution falls towards zero. In every case the inlet and outlet fluxes sum exactly to the source integral, independent of the regime, so flux balance is preserved across the full sweep.
Source
from mpi4py import MPI
from petsc4py import PETSc
import numpy as np
import ufl
import dolfinx
import dolfinx.la
import basix.ufl
import matplotlib
import matplotlib.pyplot as plt
from dolfinx import fem, mesh
from dolfinx.fem.petsc import NonlinearProblem
matplotlib.rcParams['svg.fonttype'] = 'none'
def run_advection_diffusion(w_max, D_value):
inlet_x = 0.0
outlet_x = 5.0
height = 1.0
msh = mesh.create_rectangle(
MPI.COMM_WORLD,
points=((inlet_x, 0.0), (outlet_x, height)),
n=(50, 10),
cell_type=mesh.CellType.triangle,
diagonal=mesh.DiagonalType.crossed,
)
deg = 1
V = fem.functionspace(msh, ("DG", deg))
u = fem.Function(V)
v = ufl.TestFunction(V)
n = ufl.FacetNormal(msh)
h = ufl.CellDiameter(msh)
D = fem.Constant(msh, PETSc.ScalarType(D_value))
vel_el = basix.ufl.element(
"Lagrange", msh.topology.cell_name(), 2, shape=(msh.geometry.dim,)
)
V_vel = fem.functionspace(msh, vel_el)
w = fem.Function(V_vel, name="velocity")
def velocity_func(x):
values = np.zeros((2, x.shape[1]))
values[0] = w_max * (1 - x[1] ** 2)
return values
w.interpolate(velocity_func)
penalty = fem.Constant(msh, PETSc.ScalarType(10 * deg**2))
tdim = msh.topology.dim
fdim = tdim - 1
msh.topology.create_entities(fdim)
inlet_id = 1
outlet_id = 2
wall_id = 3
symmetry_id = 4
inlet_facets = mesh.locate_entities_boundary(msh, fdim, lambda x: np.isclose(x[0], inlet_x))
outlet_facets = mesh.locate_entities_boundary(msh, fdim, lambda x: np.isclose(x[0], outlet_x))
wall_facets = mesh.locate_entities_boundary(msh, fdim, lambda x: np.isclose(x[1], height))
symmetry_facets = mesh.locate_entities_boundary(msh, fdim, lambda x: np.isclose(x[1], 0.0))
facet_imap = msh.topology.index_map(fdim)
num_facets = facet_imap.size_local + facet_imap.num_ghosts
markers = np.zeros(num_facets, dtype=np.intc)
markers[inlet_facets] = inlet_id
markers[outlet_facets] = outlet_id
markers[wall_facets] = wall_id
markers[symmetry_facets] = symmetry_id
indices = np.arange(num_facets, dtype=np.intc)
mt = mesh.meshtags(msh, fdim, indices, markers)
ds = ufl.Measure("ds", domain=msh, subdomain_data=mt)
dS = ufl.dS
dx = ufl.dx
lmbda = ufl.conditional(ufl.gt(ufl.dot(w, n), 0), 1, 0)
u_inlet = fem.Constant(msh, PETSc.ScalarType(0.0))
f_source = fem.Constant(msh, PETSc.ScalarType(1.0))
F = 0
F += -ufl.inner(w * u, ufl.grad(v)) * dx
F += ufl.inner(2 * ufl.avg(lmbda * w * u), ufl.jump(v, n)) * dS
F_inlet_adv = -ufl.inner((1 - lmbda) * ufl.dot(w, n) * u_inlet, v) * ds(inlet_id)
F_outlet_surf = ufl.inner(lmbda * ufl.dot(w, n) * u, v) * ds(outlet_id)
F += F_inlet_adv + F_outlet_surf
F += D * ufl.inner(ufl.grad(u), ufl.grad(v)) * dx
F += -D * ufl.inner(ufl.avg(ufl.grad(u)), ufl.jump(v, n)) * dS
F += -D * ufl.inner(ufl.jump(u, n), ufl.avg(ufl.grad(v))) * dS
F += D * (penalty / ufl.avg(h)) * ufl.inner(ufl.jump(u, n), ufl.jump(v, n)) * dS
F_inlet_nitsche = D * (
-ufl.inner(ufl.grad(u), v * n) * ds(inlet_id)
- ufl.inner(ufl.grad(v), (u - u_inlet) * n) * ds(inlet_id)
+ (penalty / h) * ufl.inner(u - u_inlet, v) * ds(inlet_id)
)
F += F_inlet_nitsche
F_inlet_terms = F_inlet_adv + F_inlet_nitsche
F += -ufl.inner(f_source, v) * dx
problem = NonlinearProblem(
F, u, J=ufl.derivative(F, u),
petsc_options_prefix="advecdiff",
petsc_options={
"snes_type": "newtonls",
"snes_linesearch_type": "none",
"snes_stol": np.sqrt(np.finfo(dolfinx.default_real_type).eps) * 1e-2,
"snes_atol": 1e-10,
"snes_rtol": 1e-10,
"snes_max_it": 30,
"snes_divergence_tolerance": "PETSC_UNLIMITED",
"ksp_type": "preonly",
"pc_type": "lu",
"pc_factor_mat_solver_type": "petsc",
},
)
u = problem.solve()
u.x.scatter_forward()
msh.topology.create_connectivity(fdim, tdim)
msh.topology.create_connectivity(tdim, tdim)
owned_size = V.dofmap.index_map.size_local * V.dofmap.index_map_bs
def get_owned_dofs(marker):
facets = mt.find(marker)
f_to_c = msh.topology.connectivity(fdim, tdim)
cells = np.unique(np.concatenate([f_to_c.links(f) for f in facets]))
dofs = fem.locate_dofs_topological(V, tdim, cells)
return dofs[dofs < owned_size]
inlet_dofs = get_owned_dofs(inlet_id)
outlet_dofs = get_owned_dofs(outlet_id)
def compute_consistent_flux(residual_form: fem.Form, dofs: np.ndarray):
residual = fem.assemble_vector(residual_form)
residual.scatter_reverse(dolfinx.la.InsertMode.add)
residual.scatter_forward()
local_flux = np.sum(residual.array[dofs])
return msh.comm.allreduce(local_flux, op=MPI.SUM)
F_no_outlet = fem.form(F - F_outlet_surf)
F_no_inlet = fem.form(F - F_inlet_terms)
flux_outlet = -compute_consistent_flux(F_no_outlet, outlet_dofs)
flux_inlet = -compute_consistent_flux(F_no_inlet, inlet_dofs)
source_total = msh.comm.allreduce(
fem.assemble_scalar(fem.form(f_source * dx)), op=MPI.SUM
)
return source_total, flux_outlet, flux_inlet
outlet_x = 5.0
D_value = 1.0
pe_values = np.geomspace(1e-03, 1e3, num=50)
sources = []
inlet_data = []
outlet_data = []
for pe in pe_values:
w_max = pe * D_value / outlet_x
source_total, flux_outlet, flux_inlet = run_advection_diffusion(
w_max=w_max, D_value=D_value
)
sources.append(source_total)
inlet_data.append(flux_inlet)
outlet_data.append(flux_outlet)
f_blue = (26/255, 72/255, 72/255)
f_yellow = (247/255, 176/255, 0/255)
fig, ax = plt.subplots(figsize=(8, 6))
ax.stackplot(pe_values, inlet_data, outlet_data,
labels=['Inlet flux', 'Outlet flux'],
colors=[f_blue, f_yellow], alpha=0.5)
ax.hlines(sources[0], pe_values[0], pe_values[-1],
colors='k', linestyles='dashed', label='Source integral')
source_text = f"Source: {sources[0]:.0f}" + r" m$^{-3}$$\,$s$^{-1}$"
ax.annotate(source_text, xy=(pe_values[-1]*1.05, sources[0]*1.02))
ax.annotate("Inlet flux", xy=(1e-2, 1.5), ha='center', va="center", fontsize=12, color=f_blue)
ax.annotate("Outlet flux", xy=(1e2, 3.5), ha='center', va="center", fontsize=12, color=f_yellow)
ax.set_xlim(pe_values[0], 1e04)
ax.set_ylim(0, 5.3)
ax.set_xscale('log')
ax.set_xlabel('Péclet number', loc='right', fontsize=18)
ax.set_title(r'Flux (m$^{2}$$\,$s$^{-1}$)', loc='left', fontsize=18)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.set_xticks(np.geomspace(1e-03, 1e3, num=7))
plt.tight_layout()
fig.savefig('../images/mwe2_pe_sweep.svg', bbox_inches='tight')
plt.close(fig)