+ * If a previous filters {@link Map} was assigned this new filter instance will be assigned + * into that existing map, it is not cleared or reallocated. The descriptor should either be + * an Symbol or UnsignedLong that aligns with the filters definition being used. + * + * @param name + * The name to use when adding the described filter to the filters {@link Map}. + * @param descriptor + * The descriptor used for the {@link DescribedType} that will carry the filter. + * @param filter + * The filter value to assign to the filter {@link DescribedType}. + * + * @return this {@link SourceOptions} instance. + */ + public SourceOptions addFilter(String name, Object descriptor, Object filter) { + if (filters == null) { + filters = new HashMap<>(); + } + + filters.put(name, new FilterDescribedType(descriptor, filter)); + + return self(); + } + /** * @return the configured default outcome as a {@link DeliveryState} instance. */ @@ -139,4 +167,54 @@ public SourceOptions outcomes(DeliveryState.Type... outcomes) { SourceOptions self() { return this; } + + private static class FilterDescribedType implements DescribedType { + + private final Object descriptor; + private final Object described; + + public FilterDescribedType(Object descriptor, Object described) { + this.descriptor = descriptor; + this.described = described; + } + + @Override + public Object getDescriptor() { + return descriptor; + } + + @Override + public Object getDescribed() { + return this.described; + } + + @Override + public String toString() { + return "FilterDescribedType{ descriptor:" + descriptor + ", described:" + described + " }"; + } + + @Override + public int hashCode() { + return Objects.hash(described, descriptor); + } + + @Override + public boolean equals(Object target) { + if (this == target) { + return true; + } + + if (target == null) { + return false; + } + + if (!(target instanceof DescribedType)) { + return false; + } + + final DescribedType other = (DescribedType) target; + + return Objects.equals(descriptor, other.getDescriptor()) && Objects.equals(described, other.getDescribed()); + } + } } diff --git a/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java b/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java index 7e635f97c..97b6e9ef0 100644 --- a/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java +++ b/src/test/java/com/rabbitmq/client/amqp/docs/WebsiteDocumentation.java @@ -11,9 +11,10 @@ import com.rabbitmq.client.amqp.Publisher; import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; -import java.nio.charset.StandardCharsets; import java.time.Duration; +import static java.nio.charset.StandardCharsets.UTF_8; + public class WebsiteDocumentation { void environment() { @@ -48,7 +49,7 @@ void publishing() { // create the message Message message = publisher - .message("hello".getBytes(StandardCharsets.UTF_8)) + .message("hello".getBytes(UTF_8)) .messageId(1L); // publish the message and deal with broker feedback @@ -326,4 +327,52 @@ void deactivateRecovery() { .activated(false) .connectionBuilder().build(); } + + void propertyFilterExpressions() { + Connection connection = null; + Consumer consumer = connection.consumerBuilder() + .stream().filter() + .userId("John".getBytes(UTF_8)) + .subject("&p:Order") + .property("region", "emea") + .stream().builder() + .queue("my-queue") + .messageHandler((ctx, msg ) -> { + // message processing + }) + .build(); + } + + void sqlFilterExpressions() { + Connection connection = null; + Consumer consumer = connection.consumerBuilder() + .stream().filter() + .sql("properties.user_id = 'John' AND " + + "properties.subject LIKE 'Order%' AND " + + "region = 'emea'") + .stream().builder() + .queue("my-queue") + .messageHandler((ctx, msg ) -> { + // message processing + }) + .build(); + } + + void combinedFilterExpressions() { + Connection connection = null; + Consumer consumer = connection.consumerBuilder() + .stream() + .filterValues("order.created") + .filter() + .sql("p.subject = 'order.created' AND " + + "p.creation_time > UTC() - 3600000 AND " + + "region IN ('AMER', 'EMEA', 'APJ') AND " + + "(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)") + .stream().builder() + .queue("my-queue") + .messageHandler((ctx, msg ) -> { + // message processing + }) + .build(); + } }