Skip to content

Error when Integrating openlineage api with Trino's OpenLineage event listener #13011

Open
@viethqb

Description

@viethqb

Describe the bug
I tried integrating trino's OpenLineage event listener feature (https://trino.io/docs/current/admin/event-listeners-openlineage.html) with Datahub. I was hoping this would help me automatically generate data lineage in the datahub (success with marquez), but I got an error.
gms api receives event from trino but encounters error while processing.

I deploy datahub using helm

helm -n datahub ls
# NAME         	NAMESPACE	REVISION	UPDATED                                	STATUS  	CHART                       	# APP VERSION
# datahub      	datahub  	1       	2025-03-28 00:59:58.497016883 +0700 +07	deployed	datahub-0.5.13              	v1.0.0     
# prerequisites	datahub  	1       	2025-03-27 23:47:19.39385178 +0700 +07 	deployed	datahub-prerequisites-0.1.15

My trino config

eventListenerProperties:
  - event-listener.name=openlineage
  - openlineage-event-listener.trino.uri=http://trino.trino:8080
  # - openlineage-event-listener.namespace=codespaces.trino
  - openlineage-event-listener.transport.type=HTTP
  - openlineage-event-listener.transport.url=http://datahub.lakehouse.local
  - openlineage-event-listener.transport.endpoint=/openapi/openlineage/api/v1/lineage
  - openlineage-event-listener.transport.api-key=<api_key>

Log of gms

2025-03-28 01:20:35,022 [qtp1015958146-34] INFO  i.d.o.o.controller.LineageApiImpl:59 - Received lineage event: {"eventTime":"2025-03-28T01:20:35.014Z","producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"START","run":{"runId":"7d316be5-b6bd-3888-8a4b-08b5cc34e65c","facets":{"processing_engine":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"466","name":"trino"},"trino_metadata":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","transaction_id":"0f4839d2-dc3b-4fcf-9493-fadb006b2dbf"},"trino_query_context":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","server_address":"10.244.0.53","environment":"production","query_type":"INSERT"}}},"job":{"namespace":"trino://trino.trino:8080","name":"20250328_012035_00132_is3yk","facets":{"jobType":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"TRINO","jobType":"QUERY"},"sql":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json#/$defs/SQLJobFacet","query":"create table lakehouse.test.customer\nas\nselect * from tpcds.sf300.customer"}}}}
2025-03-28 01:20:35,022 [qtp1015958146-34] INFO  i.d.o.o.controller.LineageApiImpl:61 - Deserialized to lineage event: io.openlineage.client.OpenLineage$RunEvent@688250a1
2025-03-28 01:20:35,023 [qtp1015958146-34] INFO  i.d.metadata.context.RequestContext:53 - RequestContext{actorUrn='urn:li:corpuser:datahub', sourceIP='10.244.0.53', requestAPI=OPENAPI, requestID='postRunEventRaw', userAgent='Apache-HttpClient/5.4.1 (Java/23.0.1)'}
2025-03-28 01:20:35,031 [qtp1015958146-34] INFO  i.d.o.o.controller.LineageApiImpl:81 - PostRun received lineage event: io.openlineage.client.OpenLineage$RunEvent@688250a1
2025-03-28 01:20:35,031 [qtp1015958146-34] INFO  i.d.o.converter.OpenLineageToDataHub:286 - Emitting lineage: {"eventTime":"2025-03-28T01:20:35.014Z","producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"START","run":{"runId":"7d316be5-b6bd-3888-8a4b-08b5cc34e65c","facets":{"processing_engine":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"466","name":"trino"},"trino_query_context":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","server_address":"10.244.0.53","environment":"production","query_type":"INSERT"},"trino_metadata":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","transaction_id":"0f4839d2-dc3b-4fcf-9493-fadb006b2dbf"}}},"job":{"namespace":"trino://trino.trino:8080","name":"20250328_012035_00132_is3yk","facets":{"jobType":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"TRINO","jobType":"QUERY"},"sql":{"_producer":"https://github.com/trinodb/trino/plugin/trino-openlineage","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json#/$defs/SQLJobFacet","query":"create table lakehouse.test.customer\nas\nselect * from tpcds.sf300.customer"}}}}
2025-03-28 01:20:35,032 [qtp1015958146-34] ERROR i.d.o.o.controller.LineageApiImpl:64 - java.lang.RuntimeException: Unable to determine orchestrator
java.lang.RuntimeException: java.lang.RuntimeException: Unable to determine orchestrator
	at io.datahubproject.openapi.openlineage.controller.LineageApiImpl.postRunEventRaw(LineageApiImpl.java:99)
	at io.datahubproject.openapi.openlineage.controller.LineageApiImpl.postRunEventRaw(LineageApiImpl.java:62)
	at io.datahubproject.openlineage.generated.controller.LineageApi._postRunEventRaw(LineageApi.java:66)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:355)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
	at org.springframework.validation.beanvalidation.MethodValidationInterceptor.invoke(MethodValidationInterceptor.java:174)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:768)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:720)
	at io.datahubproject.openapi.openlineage.controller.LineageApiImpl$$SpringCGLIB$$0._postRunEventRaw(<generated>)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:255)
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:188)
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:926)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:831)
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1089)
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:979)
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014)
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914)
	at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:547)
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885)
	at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:614)
	at org.eclipse.jetty.ee10.servlet.ServletHolder.handle(ServletHolder.java:736)
	at org.eclipse.jetty.ee10.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1614)
	at org.eclipse.jetty.ee10.websocket.servlet.WebSocketUpgradeFilter.doFilter(WebSocketUpgradeFilter.java:195)
	at org.eclipse.jetty.ee10.servlet.FilterHolder.doFilter(FilterHolder.java:205)
	at org.eclipse.jetty.ee10.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1586)
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
	at org.eclipse.jetty.ee10.servlet.FilterHolder.doFilter(FilterHolder.java:205)
	at org.eclipse.jetty.ee10.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1586)
	at com.datahub.auth.authentication.filter.AuthenticationFilter.doFilterInternal(AuthenticationFilter.java:127)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
	at org.eclipse.jetty.ee10.servlet.FilterHolder.doFilter(FilterHolder.java:205)
	at org.eclipse.jetty.ee10.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1586)
	at org.eclipse.jetty.ee10.servlet.ServletHandler$MappedServlet.handle(ServletHandler.java:1547)
	at org.eclipse.jetty.ee10.servlet.ServletChannel.dispatch(ServletChannel.java:824)
	at org.eclipse.jetty.ee10.servlet.ServletChannel.handle(ServletChannel.java:436)
	at org.eclipse.jetty.ee10.servlet.ServletHandler.handle(ServletHandler.java:464)
	at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:575)
	at org.eclipse.jetty.ee10.servlet.SessionHandler.handle(SessionHandler.java:703)
	at org.eclipse.jetty.server.handler.ContextHandler.handle(ContextHandler.java:1060)
	at org.eclipse.jetty.server.Server.handle(Server.java:182)
	at org.eclipse.jetty.server.internal.HttpChannelState$HandlerInvoker.run(HttpChannelState.java:662)
	at org.eclipse.jetty.server.internal.HttpConnection.onFillable(HttpConnection.java:418)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:322)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:99)
	at org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53)
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:480)
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:443)
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:293)
	at org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:201)
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:311)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:979)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1209)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1164)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: Unable to determine orchestrator
	at io.datahubproject.openlineage.converter.OpenLineageToDataHub.getOrchestrator(OpenLineageToDataHub.java:934)
	at io.datahubproject.openlineage.converter.OpenLineageToDataHub.getFlowUrn(OpenLineageToDataHub.java:904)
	at io.datahubproject.openlineage.converter.OpenLineageToDataHub.convertRunEventToJob(OpenLineageToDataHub.java:290)
	at io.datahubproject.openapi.openlineage.mapping.RunEventMapper.map(RunEventMapper.java:24)
	at io.datahubproject.openapi.openlineage.controller.LineageApiImpl.postRunEventRaw(LineageApiImpl.java:91)
	... 67 common frames omitted
2025-03-28 01:20:39,178 [pool-14-thread-1] WARN  org.opensearch.client.RestClient:85 - request [POST http://elasticsearch-master:9200/datahubpolicyindex_v2/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512&ccs_minimize_roundtrips=false] returned 1 warnings: [299 Elasticsearch-7.17.3-5ad023604c8d7416c9eb6c0eadb62b14e766caff "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.17/security-minimal-setup.html to enable security."]

Log of trino

2025-03-28T01:30:06.124Z	WARN	dispatcher-query-3	io.trino.eventlistener.EventListenerManager	Failed to publish QueryCompletedEvent for query 20250328_012945_00066_h4ttw
io.openlineage.client.transports.HttpTransportResponseException: code: 500, response: 
	at io.openlineage.client.transports.HttpTransport.throwOnHttpError(HttpTransport.java:197)
	at io.openlineage.client.transports.HttpTransport.lambda$emit$1(HttpTransport.java:163)
	at org.apache.hc.client5.http.impl.classic.CloseableHttpClient.execute(CloseableHttpClient.java:247)
	at org.apache.hc.client5.http.impl.classic.CloseableHttpClient.execute(CloseableHttpClient.java:188)
	at org.apache.hc.client5.http.impl.classic.CloseableHttpClient.execute(CloseableHttpClient.java:162)
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:160)
	at io.openlineage.client.transports.HttpTransport.emit(HttpTransport.java:140)
	at io.openlineage.client.OpenLineageClient.lambda$emit$0(OpenLineageClient.java:86)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
	at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:86)
	at io.trino.plugin.openlineage.OpenLineageListener.queryCompleted(OpenLineageListener.java:103)
	at io.trino.eventlistener.EventListenerManager.doQueryCompleted(EventListenerManager.java:163)
	at io.trino.eventlistener.EventListenerManager.queryCompleted(EventListenerManager.java:153)
	at io.trino.event.QueryMonitor.queryCompletedEvent(QueryMonitor.java:262)
	at io.trino.execution.QueryStateMachine.lambda$addQueryInfoStateChangeListener$19(QueryStateMachine.java:1280)
	at io.trino.execution.StateMachine.fireStateChangedListener(StateMachine.java:240)
	at io.trino.execution.StateMachine.lambda$fireStateChanged$0(StateMachine.java:232)
	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:79)
	at io.trino.$gen.Trino_466____20250328_012615_2.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1575)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugBug report

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions